# API Reference

Complete reference for the Health Universe A2A SDK for Python.

## Core Classes

### Agent (AsyncAgent)

The primary agent class for Health Universe. Use this for all Health Universe agents.

```python
Update to show: from health_universe_a2a import Agent, AgentContext  # Agent is alias for AsyncAgent
    def get_agent_name(self) -> str:
        return "My Agent"

    def get_agent_description(self) -> str:
        return "Processes medical data"

    async def process_message(self, message: str, context: AgentContext) -> str:
        return f"Processed: {message}"
```

#### Required Methods

* **`get_agent_name() -> str`**
  * Returns the agent's display name
  * Example: `"Clinical Data Analyzer"`
* **`get_agent_description() -> str`**
  * Returns what the agent does
  * Example: `"Analyzes clinical datasets and generates insights"`
* **`async process_message(message: str, context: AgentContext) -> str`**
  * Processes messages and returns results
  * Called in background after validation passes
  * All updates sent via POST to backend

#### Optional Methods

* **`async validate_message(message: str, metadata: dict) -> ValidationResult`**
  * Validates incoming messages before processing
  * Returns `ValidationAccepted` or `ValidationRejected`
  * Default: accepts all messages
* **`get_max_duration_seconds() -> int`**
  * Override max task duration (default: 1 hour)
  * Returns maximum duration in seconds
* **`async on_task_start(message: str, context: AgentContext) -> None`**
  * Called before `process_message` starts in background
  * Use for logging, metrics, setup
* **`async on_task_complete(message: str, result: str, context: AgentContext) -> None`**
  * Called after `process_message` completes successfully
  * Use for logging, metrics, cleanup
* **`async on_task_error(message: str, error: Exception, context: AgentContext) -> str | None`**
  * Called when `process_message` raises an exception
  * Return custom error message or `None` for default

#### Server Methods

* **`serve(host: str = None, port: int = None, reload: bool = None, log_level: str = "info") -> None`**
  * Start HTTP server for this agent
  * Environment variables: `HOST`, `PORT`/`AGENT_PORT`, `RELOAD`

#### Inter-Agent Communication

* **`async call_agent(agent_name_or_url: str, message: str | dict, context: AgentContext = None, timeout: float = 30.0) -> Any`**
  * Unified method to call another A2A agent
  * Supports agent names, URLs, or local paths
  * Automatically propagates JWT tokens

### AgentContext (BackgroundContext)

Context for Agent background job execution. Provides document operations, progress updates, and inter-agent communication.

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    # List documents
    docs = await context.document_client.list_documents()
    
    # Update progress
    await context.update_progress("Processing...", 0.5)
    
    # Write results
    await context.document_client.write("Results", json.dumps(data))
    
    return "Complete!"
```

#### Properties

* **`document_client: DocumentClientBase`** - Document operations in this thread
* **`user_id: str | None`** - User ID from request (optional)
* **`thread_id: str | None`** - Thread/conversation ID (optional)
* **`job_id: str`** - Background job ID
* **`metadata: dict[str, Any]`** - Raw metadata from A2A request

#### Methods

* **`async update_progress(message: str, progress: float = None, status: str = "working", importance: UpdateImportance = NOTICE) -> None`**
  * Send progress update (POSTed to backend)
  * Updates stored in database and displayed to users
  * `progress`: 0.0 to 1.0, `importance`: controls UI visibility
* **`async add_artifact(name: str, content: str, data_type: str = "text/plain", description: str = "", metadata: dict = None) -> None`**
  * Add artifact (POSTed to backend)
  * Artifacts appear in UI and can be downloaded
  * For persistent storage, use `context.document_client.write()` instead
* **`create_inter_agent_client(agent_identifier: str, timeout: float = 30.0) -> InterAgentClient`**
  * Create InterAgentClient with automatic JWT propagation
  * Recommended way to call other agents
* **`is_cancelled() -> bool`**
  * Check if task was cancelled (currently always returns `False`)
  * Cancellation feature not fully implemented

#### Sync Methods (ThreadPoolExecutor)

* **`update_progress_sync(message: str, progress: float = None, status: str = "working", importance: UpdateImportance = NOTICE) -> None`**
* **`add_artifact_sync(name: str, content: str, data_type: str = "text/plain", description: str = "", metadata: dict = None) -> None`**

### SubAgent

Agent for internal/inter-agent use. Returns results directly (no background job).

```python
from health_universe_a2a import SubAgent, SubAgentContext

class DocumentReader(SubAgent):
    def get_agent_name(self) -> str:
        return "Document Reader"

    def get_agent_description(self) -> str:
        return "Extracts data from documents"

    async def process_message(self, message: str, context: SubAgentContext) -> str:
        data = json.loads(message)
        return json.dumps({"result": "processed"})
```

#### Required Methods

* Same as Agent: `get_agent_name()`, `get_agent_description()`, `process_message()`

#### Key Differences from Agent

* No background jobs or SSE streaming
* Returns results directly in HTTP response
* Uses `SubAgentContext` (lightweight, no progress updates)
* Processing should complete in < 30 seconds

### SubAgentContext

Lightweight context for SubAgent. Provides document operations and inter-agent communication without progress updates.

#### Properties

* **`document_client: DocumentClientBase`** - Document operations in this thread
* **`user_id: str | None`** - User ID from request
* **`thread_id: str | None`** - Thread/conversation ID
* **`auth_token: str | None`** - JWT token for inter-agent calls

#### Methods

* **`create_inter_agent_client(agent_identifier: str, timeout: float = 30.0) -> InterAgentClient`**
* **`is_cancelled() -> bool`**

## Document Operations

### DocumentClient

Client for document operations in a Health Universe thread. Provides listing, reading, writing, and searching documents.

```python
# Accessed via context.document_client
docs = await context.document_client.list_documents()
content = await context.document_client.download_text(doc_id)
await context.document_client.write("Results", json.dumps(data))
```

#### Document Listing

* **`async list_documents(include_hidden: bool = False, role: Literal["source", "artifact", "all"] | None = None, flatten_attachments: bool = True) -> list[Document]`**
  * List documents in thread
  * `role`: Filter by document type (`"source"` for uploads, `"artifact"` for outputs)
  * `flatten_attachments`: Replace wrapper docs with child attachments
* **`async filter_by_name(query: str) -> list[Document]`**
  * Filter documents by name/filename (case-insensitive substring match)
* **`async get_document(document_id: str) -> Document`**
  * Get document metadata by ID

#### Document Reading

* **`async download(document_id: str) -> bytes`**
  * Download document content as bytes
* **`async download_text(document_id: str, encoding: str = "utf-8") -> str`**
  * Download raw document content as text
  * Only use for text-based files (CSV, JSON, TXT, MD)
* **`async download_extracted(document_id: str) -> str`**
  * Download platform-extracted markdown text
  * For PDFs, DOCX converted to markdown by platform

#### Document Writing

* **`async write(name: str, content: str | bytes, filename: str = None, document_type: str = "agent_output", user_visible: bool = True, comment: str = None) -> Document`**
  * Write content to new document
  * Handles 3-step upload process automatically
  * `filename`: Auto-generated if not provided
* **`async update(document_id: str, content: str | bytes, comment: str = None, base_version_id: str = None) -> Document`**
  * Update existing document with new content (creates new version)

#### Document Search

* **`async search(query: str, limit: int = 3, patient_id: str = None) -> list[SearchResult]`**
  * Full-text search across documents
  * Case-insensitive text search over extracted chunks
  * Use for keyword-based lookups
* **`async semantic_search(query: str, document_id: str = None, max_results: int = 5, similarity_threshold: float = 0.4, patient_id: str = None) -> list[SemanticSearchResult]`**
  * Semantic search using vector embeddings
  * Finds semantically similar content
  * Powered by pgvector embeddings

#### Processing Status

* **`async get_processing_status(document_id: str) -> DocumentProcessingStatus`**
  * Get extraction/processing status of document
* **`async wait_for_ready(document_ids: list[str] = None, poll_interval: float = 2.0, timeout: float = 300.0, patient_id: str = None, organization_id: str = None) -> list[DocumentProcessingStatus]`**
  * Poll until documents are ready (extraction complete)
  * If `document_ids` is None, waits for all source documents
* **`async close() -> None`**
  * Close HTTP client

### Document Data Models

#### Document

```python
@dataclass
class Document:
    id: str                          # Document UUID
    name: str                        # Display name
    filename: str                    # Storage filename
    document_type: str              # "user_upload" or "agent_output"
    storage_path: str | None        # S3 storage path
    latest_version: int | None      # Version number
    latest_version_id: str | None   # Version UUID
    user_visible: bool              # Visible to users
```

#### SearchResult

```python
@dataclass
class SearchResult:
    document_name: str              # Document name
    chunk_index: int               # Position in document
    content: str                   # Matching text content
    metadata: dict | None          # Optional chunk metadata
```

#### SemanticSearchResult

```python
@dataclass
class SemanticSearchResult:
    chunk_id: str                  # Chunk UUID
    document_id: str              # Document UUID
    document_name: str            # Document name
    content: str                  # Matching text content
    similarity: float             # Cosine similarity (0-1)
    chunk_index: int             # Position in document
    metadata: dict | None        # Optional chunk metadata
```

#### DocumentProcessingStatus

```python
@dataclass
class DocumentProcessingStatus:
    document_id: str                    # Document UUID
    document_name: str                  # Document filename
    status: str                         # Processing state
    chunks_count: int | None           # Number of chunks
    error_message: str | None          # Error details if failed
    enhanced_ocr_status: str | None    # Enhanced OCR status
    
    @property
    def is_ready(self) -> bool:        # Whether extraction is complete
    
    @property 
    def is_terminal(self) -> bool:     # Whether processing finished
```

## Inter-Agent Communication

### InterAgentClient

Client for calling other A2A-compliant agents with JWT propagation and retry logic.

```python
# Via context (recommended)
client = context.create_inter_agent_client("/processor")
response = await client.call("Process this data")

# Direct instantiation
client = InterAgentClient("/processor", auth_token="jwt-token")
response = await client.call_with_data({"query": "test"})
```

#### Constructor

* **`__init__(agent_identifier: str, auth_token: str = None, local_base_url: str = None, timeout: float = 30.0, max_retries: int = 3)`**
  * `agent_identifier`: Target agent (`/path`, `http://url`, or name)
  * `auth_token`: JWT token to propagate
  * `local_base_url`: Base URL for local agents (default: from env)
  * `timeout`: Request timeout in seconds
  * `max_retries`: Max retry attempts for transient errors

#### Factory Methods

* **`@classmethod from_registry(agent_name: str, auth_token: str = None, timeout: float = 30.0) -> InterAgentClient`**
  * Create client from registry lookup
  * Resolves agent name using global registry

#### Communication Methods

* **`async call(message: str, timeout: float = None) -> AgentResponse`**
  * Call agent with text message
  * Returns parsed response
* **`async call_with_data(data: Any, timeout: float = None) -> AgentResponse`**
  * Call agent with structured data (dict, list, etc.)
* **`async close() -> None`**
  * Close client (cleanup)

### AgentResponse

Response from inter-agent call with convenient parsed data access.

```python
response = await client.call("Hello")
print(response.text)        # Extract text content
print(response.data)        # Extract structured data
print(response.raw_response) # Full A2A response
```

#### Properties

* **`text: str`** - Concatenated text from all text parts
* **`data: Any`** - First data part from response (structured data)
* **`parts: list[dict]`** - All parts from response
* **`raw_response: dict`** - Full A2A response

#### Methods

* **`__str__() -> str`** - Returns text content
* **`__repr__() -> str`** - Detailed representation

## Validation

### ValidationResult Types

```python
from health_universe_a2a import ValidationAccepted, ValidationRejected

async def validate_message(self, message: str, metadata: dict) -> ValidationResult:
    if len(message) < 10:
        return ValidationRejected(reason="Message too short")
    return ValidationAccepted(estimated_duration_seconds=60)
```

#### ValidationAccepted

```python
class ValidationAccepted:
    status: Literal["accepted"] = "accepted"
    estimated_duration_seconds: int | None = None
```

#### ValidationRejected

```python
class ValidationRejected:
    status: Literal["rejected"] = "rejected"
    reason: str  # Human-readable reason
```

## Extensions and Types

### UpdateImportance

Controls how updates are propagated to Navigator UI.

```python
from health_universe_a2a import UpdateImportance

await context.update_progress(
    "Processing...", 
    importance=UpdateImportance.NOTICE  # Shown in Navigator
)

await context.update_progress(
    "Debug info...", 
    importance=UpdateImportance.INFO    # Not shown in Navigator
)
```

#### Values

* **`ERROR`** - Something went wrong (pushed to Navigator)
* **`NOTICE`** - Standard progress update (default, pushed to Navigator)
* **`INFO`** - Verbose logging (stored but not pushed)
* **`DEBUG`** - Diagnostic information (stored but not pushed)

### NavigatorTaskStatus

Task status values for Navigator UI.

```python
class NavigatorTaskStatus(str, Enum):
    WORKING = "working"
    COMPLETED = "completed" 
    FAILED = "failed"
```

## Local Development

### LocalDocumentClient

Filesystem-backed document client for local testing.

```python
from health_universe_a2a.local import LocalDocumentClient

doc_client = LocalDocumentClient(
    data_dir="./test_data",
    output_dir="./output"
)
docs = await doc_client.list_documents()
```

#### Constructor

* **`__init__(data_dir: str, output_dir: str = None)`**
  * `data_dir`: Root directory with `source/` and `artifact/` subdirectories
  * `output_dir`: Output directory (defaults to `data_dir/artifact/`)

#### Methods

* Same interface as `DocumentClient`
* `search()` and `semantic_search()` raise `NotImplementedError`

### create\_local\_context()

Factory function for local testing context.

```python
from health_universe_a2a.local import create_local_context

context = create_local_context("./test_data", "./output")
result = await agent.process_message("test", context)
```

#### Parameters

* **`data_dir: str`** - Root directory for test data
* **`output_dir: str = None`** - Output directory (defaults to `data_dir/artifact/`)

#### Returns

* **`AgentContext`** - Configured for local filesystem operations

## Server Utilities

### create\_app()

Create Starlette ASGI application for an A2A agent.

```python
from health_universe_a2a import create_app

app = create_app(agent)
uvicorn.run(app, host="0.0.0.0", port=8000)
```

#### Parameters

* **`agent: A2AAgentBase`** - Agent instance to serve
* **`task_store: Any = None`** - Optional task store (defaults to InMemoryTaskStore)

#### Returns

* **`Any`** - Starlette application with A2A endpoints

### serve()

Start HTTP server for an agent (convenience wrapper).

```python
agent.serve()  # Uses environment variables
agent.serve(host="localhost", port=8080, reload=True)
```

#### Parameters

* **`agent: A2AAgentBase`** - Agent to serve
* **`host: str = None`** - Server host (default: from `HOST` env var or "0.0.0.0")
* **`port: int = None`** - Server port (default: from `PORT`/`AGENT_PORT` env var or 8000)
* **`reload: bool = None`** - Auto-reload (default: from `RELOAD` env var or False)
* **`log_level: str = "info"`** - Uvicorn log level

### Multi-Agent Server

Functions for hosting multiple agents at different paths.

#### create\_multi\_agent\_app()

```python
from health_universe_a2a import create_multi_agent_app

app = create_multi_agent_app({
    "/orchestrator": orchestrator_agent,
    "/processor": processor_agent,
})
```

#### serve\_multi\_agents()

```python
from health_universe_a2a import serve_multi_agents

serve_multi_agents({
    "/orchestrator": orchestrator_agent,
    "/processor": processor_agent,
}, port=8501)
```

## Environment Variables

### Server Configuration

* **`HOST`** - Server host (default: "0.0.0.0")
* **`PORT`** or **`AGENT_PORT`** - Server port (default: 8000, multi-agent: 8501)
* **`RELOAD`** - Enable auto-reload ("true"/"false", default: "false")

### Service URLs

* **`HU_NESTJS_URL`** - NestJS API base URL (default: "<https://apps.healthuniverse.com/api/v1>")
* **`LOCAL_AGENT_BASE_URL`** - Base URL for local agents (default: "<http://localhost:8501>")

### Agent Registry

* **`AGENT_REGISTRY`** - JSON string mapping agent names to URLs
* **`AGENT_REGISTRY_PATH`** - Path to agent registry JSON file

### Debugging

* **`DEBUG_HTTP_REQUESTS`** - Enable HTTP request logging ("true"/"false")

## Error Handling

### Common Exceptions

* **`ValueError`** - Invalid parameters, missing documents, unresolved agents
* **`httpx.HTTPError`** - Network errors, API failures
* **`TimeoutError`** - Task timeout, waiting for documents
* **`NotImplementedError`** - Local mode limitations (search functions)

### Best Practices

1. **Validation**: Use `validate_message()` to check inputs before processing
2. **Error Handling**: Implement try/except in `process_message()` and `on_task_error()`
3. **Cancellation**: Check `context.is_cancelled()` in long-running loops
4. **Timeouts**: Use appropriate timeouts for inter-agent calls
5. **Resource Cleanup**: Always call `client.close()` and `document_client.close()`

## Examples

### Basic Agent

```python
from health_universe_a2a import Agent, AgentContext
import json

class DataAnalyzer(Agent):
    def get_agent_name(self) -> str:
        return "Clinical Data Analyzer"
    
    def get_agent_description(self) -> str:
        return "Analyzes clinical datasets and generates insights"
    
    async def process_message(self, message: str, context: AgentContext) -> str:
        # List documents
        docs = await context.document_client.list_documents(role="source")
        
        # Process with progress updates
        await context.update_progress("Loading data...", 0.1)
        
        results = []
        for i, doc in enumerate(docs):
            content = await context.document_client.download_text(doc.id)
            result = self.analyze_document(content)
            results.append(result)
            
            await context.update_progress(
                f"Processed {i+1}/{len(docs)} documents",
                (i+1) / len(docs)
            )
        
        # Write results
        await context.document_client.write(
            "Analysis Results",
            json.dumps(results, indent=2),
            filename="analysis.json"
        )
        
        return f"Analysis complete! Processed {len(docs)} documents."
    
    def analyze_document(self, content: str) -> dict:
        return {"length": len(content), "lines": len(content.splitlines())}

if __name__ == "__main__":
    agent = DataAnalyzer()
    agent.serve()
```

### Inter-Agent Communication

```python
from health_universe_a2a import Agent, AgentContext

class OrchestratorAgent(Agent):
    def get_agent_name(self) -> str:
        return "Document Orchestrator"
    
    def get_agent_description(self) -> str:
        return "Orchestrates document processing workflow"
    
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call document reader
        reader_response = await self.call_agent(
            "document-reader", 
            message, 
            context
        )
        
        # Call analyzer with reader results
        analyzer_response = await self.call_agent(
            "data-analyzer",
            reader_response,
            context
        )
        
        return f"Workflow complete: {analyzer_response}"
```

### Local Development

```python
from health_universe_a2a.local import create_local_context

async def test_agent_locally():
    agent = DataAnalyzer()
    context = create_local_context("./test_data")
    
    result = await agent.process_message("Analyze documents", context)
    print(f"Result: {result}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(test_agent_locally())
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.healthuniverse.com/overview/building-apps-in-health-universe/developing-your-health-universe-app/working-with-a2a-agents/api-reference.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
