# API Reference

Complete reference for the Health Universe A2A SDK for Python.

## Core Classes

### Agent (AsyncAgent)

The primary agent class for Health Universe. Use this for all Health Universe agents.

```python
from health_universe_a2a import Agent, AgentContext

class MyAgent(Agent):
    def get_agent_name(self) -> str:
        return "My Agent"

    def get_agent_description(self) -> str:
        return "Processes medical data"

    async def process_message(self, message: str, context: AgentContext) -> str:
        return f"Processed: {message}"
```

#### Required Methods

* **`get_agent_name() -> str`**
  * Returns the agent's display name
  * Example: `"Clinical Data Analyzer"`
* **`get_agent_description() -> str`**
  * Returns what the agent does
  * Example: `"Analyzes clinical datasets and generates insights"`
* **`async process_message(message: str, context: AgentContext) -> str`**
  * Processes messages and returns results
  * Called in background after validation passes
  * All updates sent via POST to backend

#### Optional Methods

* **`async validate_message(message: str, metadata: dict) -> ValidationResult`**
  * Validates incoming messages before processing
  * Returns `ValidationAccepted` or `ValidationRejected`
  * Default: accepts all messages
* **`get_max_duration_seconds() -> int`**
  * Override max task duration (default: 1 hour)
  * Returns maximum duration in seconds
* **`async on_task_start(message: str, context: AgentContext) -> None`**
  * Called before `process_message` starts in background
  * Use for logging, metrics, setup
* **`async on_task_complete(message: str, result: str, context: AgentContext) -> None`**
  * Called after `process_message` completes successfully
  * Use for logging, metrics, cleanup
* **`async on_task_error(message: str, error: Exception, context: AgentContext) -> str | None`**
  * Called when `process_message` raises an exception
  * Return custom error message or `None` for default

#### Server Methods

* **`serve(host: str = None, port: int = None, reload: bool = None, log_level: str = "info") -> None`**
  * Start HTTP server for this agent
  * Environment variables: `HOST`, `PORT`/`AGENT_PORT`, `RELOAD`

#### Inter-Agent Communication

* **`async call_agent(agent_name_or_url: str, message: str | dict, context: AgentContext = None, timeout: float = 30.0) -> Any`**
  * Unified method to call another A2A agent
  * Supports agent names, URLs, or local paths
  * Automatically propagates JWT tokens

### AgentContext (BackgroundContext)

Context for Agent background job execution. Provides document operations, progress updates, and inter-agent communication.

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    # List documents
    docs = await context.documents.list_documents()
    
    # Update progress
    await context.update_progress("Processing...", 0.5)
    
    # Write results
    await context.documents.write("Results", json.dumps(data))
    
    return "Complete!"
```

#### Properties

* **`documents: DocumentClient`** - Document operations in this thread
* **`user_id: str | None`** - User ID from request (optional)
* **`thread_id: str | None`** - Thread/conversation ID (optional)
* **`job_id: str`** - Background job ID
* **`metadata: dict[str, Any]`** - Raw metadata from A2A request

#### Methods

* **`async update_progress(message: str, progress: float = None, status: str = "working", importance: UpdateImportance = NOTICE) -> None`**
  * Send progress update (POSTed to backend)
  * Updates stored in database and displayed to users
  * `progress`: 0.0 to 1.0, `importance`: controls UI visibility
* **`async add_artifact(name: str, content: str, data_type: str = "text/plain", description: str = "", metadata: dict = None) -> None`**
  * Add artifact (POSTed to backend)
  * Artifacts appear in UI and can be downloaded
  * For persistent storage, use `context.documents.write()` instead
* **`create_inter_agent_client(agent_identifier: str, timeout: float = 30.0) -> InterAgentClient`**
  * Create InterAgentClient with automatic JWT propagation
  * Recommended way to call other agents
* **`is_cancelled() -> bool`**
  * Check if task was cancelled (currently always returns `False`)
  * Cancellation feature not fully implemented

#### Sync Methods (ThreadPoolExecutor)

* **`update_progress_sync(message: str, progress: float = None, status: str = "working", importance: UpdateImportance = NOTICE) -> None`**
* **`add_artifact_sync(name: str, content: str, data_type: str = "text/plain", description: str = "", metadata: dict = None) -> None`**

### SubAgent

Agent for internal/inter-agent use. Returns results directly (no background job).

```python
from health_universe_a2a import SubAgent, SubAgentContext

class DocumentReader(SubAgent):
    def get_agent_name(self) -> str:
        return "Document Reader"

    def get_agent_description(self) -> str:
        return "Extracts data from documents"

    async def process_message(self, message: str, context: SubAgentContext) -> str:
        data = json.loads(message)
        return json.dumps({"result": "processed"})
```

#### Required Methods

* Same as Agent: `get_agent_name()`, `get_agent_description()`, `process_message()`

#### Key Differences from Agent

* No background jobs or SSE streaming
* Returns results directly in HTTP response
* Uses `SubAgentContext` (lightweight, no progress updates)
* Processing should complete in < 30 seconds

### SubAgentContext

Lightweight context for SubAgent. Provides document operations and inter-agent communication without progress updates.

#### Properties

* **`documents: DocumentClient`** - Document operations in this thread
* **`user_id: str | None`** - User ID from request
* **`thread_id: str | None`** - Thread/conversation ID
* **`auth_token: str | None`** - JWT token for inter-agent calls

#### Methods

* **`create_inter_agent_client(agent_identifier: str, timeout: float = 30.0) -> InterAgentClient`**
* **`is_cancelled() -> bool`**

## Document Operations

### DocumentClient

Client for document operations in a Health Universe thread. Provides listing, reading, writing, and searching documents.

```python
# Accessed via context.documents
docs = await context.documents.list_documents()
content = await context.documents.download_text(doc_id)
await context.documents.write("Results", json.dumps(data))
```

#### Document Listing

* **`async list_documents(include_hidden: bool = False, role: Literal["source", "artifact", "all"] = None, flatten_attachments: bool = True) -> list[Document]`**
  * List documents in thread
  * `role`: Filter by document type (`"source"` for uploads, `"artifact"` for outputs)
  * `flatten_attachments`: Replace wrapper docs with child attachments
* **`async filter_by_name(query: str) -> list[Document]`**
  * Filter documents by name/filename (case-insensitive substring match)
* **`async get_document(document_id: str) -> Document`**
  * Get document metadata by ID

#### Document Reading

* **`async download(document_id: str) -> bytes`**
  * Download document content as bytes
* **`async download_text(document_id: str, encoding: str = "utf-8") -> str`**
  * Download raw document content as text
  * Only use for text-based files (CSV, JSON, TXT, MD)
* **`async download_extracted(document_id: str) -> str`**
  * Download platform-extracted markdown text
  * For PDFs, DOCX converted to markdown by platform

#### Document Writing

* **`async write(name: str, content: str | bytes, filename: str = None, document_type: str = "agent_output", user_visible: bool = True, comment: str = None) -> Document`**
  * Write content to new document
  * Handles 3-step upload process automatically
  * `filename`: Auto-generated if not provided
* **`async update(document_id: str, content: str | bytes, comment: str = None, base_version_id: str = None) -> Document`**
  * Update existing document with new content (creates new version)

#### Document Search

* **`async search(query: str, limit: int = 3, patient_id: str = None) -> list[SearchResult]`**
  * Full-text search across documents
  * Case-insensitive text search over extracted chunks
  * Use for keyword-based lookups
* **`async semantic_search(query: str, document_id: str = None, max_results: int = 5, similarity_threshold: float = 0.4, patient_id: str = None) -> list[SemanticSearchResult]`**
  * Semantic search using vector embeddings
  * Finds semantically similar content
  * Powered by pgvector embeddings

#### Processing Status

* **`async get_processing_status(document_id: str) -> DocumentProcessingStatus`**
  * Get extraction/processing status of document
* **`async wait_for_ready(document_ids: list[str] = None, poll_interval: float = 2.0, timeout: float = 300.0) -> list[DocumentProcessingStatus]`**
  * Poll until documents are ready (extraction complete)
  * If `document_ids` is None, waits for all source documents
* **`async close() -> None`**
  * Close HTTP client

### Document Data Models

#### Document

```python
@dataclass
class Document:
    id: str                          # Document UUID
    name: str                        # Display name
    filename: str                    # Storage filename
    document_type: str              # "user_upload" or "agent_output"
    storage_path: str | None        # S3 storage path
    latest_version: int | None      # Version number
    latest_version_id: str | None   # Version UUID
    user_visible: bool              # Visible to users
```

#### SearchResult

```python
@dataclass
class SearchResult:
    document_name: str              # Document name
    chunk_index: int               # Position in document
    content: str                   # Matching text content
    metadata: dict | None          # Optional chunk metadata
```

#### SemanticSearchResult

```python
@dataclass
class SemanticSearchResult:
    chunk_id: str                  # Chunk UUID
    document_id: str              # Document UUID
    document_name: str            # Document name
    content: str                  # Matching text content
    similarity: float             # Cosine similarity (0-1)
    chunk_index: int             # Position in document
    metadata: dict | None        # Optional chunk metadata
```

#### DocumentProcessingStatus

```python
@dataclass
class DocumentProcessingStatus:
    document_id: str                    # Document UUID
    document_name: str                  # Document filename
    status: str                         # Processing state
    chunks_count: int | None           # Number of chunks
    error_message: str | None          # Error details if failed
    enhanced_ocr_status: str | None    # Enhanced OCR status
    
    @property
    def is_ready(self) -> bool:        # Whether extraction is complete
    
    @property 
    def is_terminal(self) -> bool:     # Whether processing finished
```

## Inter-Agent Communication

### InterAgentClient

Client for calling other A2A-compliant agents with JWT propagation and retry logic.

```python
# Via context (recommended)
client = context.create_inter_agent_client("/processor")
response = await client.call("Process this data")

# Direct instantiation
client = InterAgentClient("/processor", auth_token="jwt-token")
response = await client.call_with_data({"query": "test"})
```

#### Constructor

* **`__init__(agent_identifier: str, auth_token: str = None, local_base_url: str = None, timeout: float = 30.0, max_retries: int = 3)`**
  * `agent_identifier`: Target agent (`/path`, `http://url`, or name)
  * `auth_token`: JWT token to propagate
  * `local_base_url`: Base URL for local agents (default: from env)
  * `timeout`: Request timeout in seconds
  * `max_retries`: Max retry attempts for transient errors

#### Factory Methods

* **`@classmethod from_registry(agent_name: str, auth_token: str = None, timeout: float = 30.0) -> InterAgentClient`**
  * Create client from registry lookup
  * Resolves agent name using global registry

#### Communication Methods

* **`async call(message: str, timeout: float = None) -> AgentResponse`**
  * Call agent with text message
  * Returns parsed response
* **`async call_with_data(data: Any, timeout: float = None) -> AgentResponse`**
  * Call agent with structured data (dict, list, etc.)
* **`async close() -> None`**
  * Close client (cleanup)

### AgentResponse

Response from inter-agent call with convenient parsed data access.

```python
response = await client.call("Hello")
print(response.text)        # Extract text content
print(response.data)        # Extract structured data
print(response.raw_response) # Full A2A response
```

#### Properties

* **`text: str`** - Concatenated text from all text parts
* **`data: Any`** - First data part from response (structured data)
* **`parts: list[dict]`** - All parts from response
* **`raw_response: dict`** - Full A2A response

#### Methods

* **`__str__() -> str`** - Returns text content
* **`__repr__() -> str`** - Detailed representation

## Validation

### ValidationResult Types

```python
from health_universe_a2a import ValidationAccepted, ValidationRejected

async def validate_message(self, message: str, metadata: dict) -> ValidationResult:
    if len(message) < 10:
        return ValidationRejected(reason="Message too short")
    return ValidationAccepted(estimated_duration_seconds=60)
```

#### ValidationAccepted

```python
class ValidationAccepted:
    status: Literal["accepted"] = "accepted"
    estimated_duration_seconds: int | None = None
```

#### ValidationRejected

```python
class ValidationRejected:
    status: Literal["rejected"] = "rejected"
    reason: str  # Human-readable reason
```

## Extensions and Types

### UpdateImportance

Controls how updates are propagated to Navigator UI.

```python
from health_universe_a2a import UpdateImportance

await context.update_progress(
    "Processing...", 
    importance=UpdateImportance.NOTICE  # Shown in Navigator
)

await context.update_progress(
    "Debug info...", 
    importance=UpdateImportance.INFO    # Not shown in Navigator
)
```

#### Values

* **`ERROR`** - Something went wrong (pushed to Navigator)
* **`NOTICE`** - Standard progress update (default, pushed to Navigator)
* **`INFO`** - Verbose logging (stored but not pushed)
* **`DEBUG`** - Diagnostic information (stored but not pushed)

### NavigatorTaskStatus

Task status values for Navigator UI.

```python
class NavigatorTaskStatus(str, Enum):
    WORKING = "working"
    COMPLETED = "completed" 
    FAILED = "failed"
```

## Local Development

### LocalDocumentClient

Filesystem-backed document client for local testing.

```python
from health_universe_a2a.local import LocalDocumentClient

doc_client = LocalDocumentClient(
    data_dir="./test_data",
    output_dir="./output"
)
docs = await doc_client.list_documents()
```

#### Constructor

* **`__init__(data_dir: str, output_dir: str = None)`**
  * `data_dir`: Root directory with `source/` and `artifact/` subdirectories
  * `output_dir`: Output directory (defaults to `data_dir/artifact/`)

#### Methods

* Same interface as `DocumentClient`
* `search()` and `semantic_search()` raise `NotImplementedError`

### create\_local\_context()

Factory function for local testing context.

```python
from health_universe_a2a.local import create_local_context

context = create_local_context("./test_data", "./output")
result = await agent.process_message("test", context)
```

#### Parameters

* **`data_dir: str`** - Root directory for test data
* **`output_dir: str = None`** - Output directory (defaults to `data_dir/artifact/`)

#### Returns

* **`AgentContext`** - Configured for local filesystem operations

## Server Utilities

### create\_app()

Create Starlette ASGI application for an A2A agent.

```python
from health_universe_a2a import create_app

app = create_app(agent)
uvicorn.run(app, host="0.0.0.0", port=8000)
```

#### Parameters

* **`agent: A2AAgentBase`** - Agent instance to serve
* **`task_store: Any = None`** - Optional task store (defaults to InMemoryTaskStore)

#### Returns

* **`Any`** - Starlette application with A2A endpoints

### serve()

Start HTTP server for an agent (convenience wrapper).

```python
agent.serve()  # Uses environment variables
agent.serve(host="localhost", port=8080, reload=True)
```

#### Parameters

* **`agent: A2AAgentBase`** - Agent to serve
* **`host: str = None`** - Server host (default: from `HOST` env var or "0.0.0.0")
* **`port: int = None`** - Server port (default: from `PORT`/`AGENT_PORT` env var or 8000)
* **`reload: bool = None`** - Auto-reload (default: from `RELOAD` env var or False)
* **`log_level: str = "info"`** - Uvicorn log level

### Multi-Agent Server

Functions for hosting multiple agents at different paths.

#### create\_multi\_agent\_app()

```python
from health_universe_a2a import create_multi_agent_app

app = create_multi_agent_app({
    "/orchestrator": orchestrator_agent,
    "/processor": processor_agent,
})
```

#### serve\_multi\_agents()

```python
from health_universe_a2a import serve_multi_agents

serve_multi_agents({
    "/orchestrator": orchestrator_agent,
    "/processor": processor_agent,
}, port=8501)
```

## Environment Variables

### Server Configuration

* **`HOST`** - Server host (default: "0.0.0.0")
* **`PORT`** or **`AGENT_PORT`** - Server port (default: 8000, multi-agent: 8501)
* **`RELOAD`** - Enable auto-reload ("true"/"false", default: "false")

### Service URLs

* **`HU_NESTJS_URL`** - NestJS API base URL (default: "<https://apps.healthuniverse.com/api/v1>")
* **`LOCAL_AGENT_BASE_URL`** - Base URL for local agents (default: "<http://localhost:8501>")

### Agent Registry

* **`AGENT_REGISTRY`** - JSON string mapping agent names to URLs
* **`AGENT_REGISTRY_PATH`** - Path to agent registry JSON file

### Debugging

* **`DEBUG_HTTP_REQUESTS`** - Enable HTTP request logging ("true"/"false")

## Error Handling

### Common Exceptions

* **`ValueError`** - Invalid parameters, missing documents, unresolved agents
* **`httpx.HTTPError`** - Network errors, API failures
* **`TimeoutError`** - Task timeout, waiting for documents
* **`NotImplementedError`** - Local mode limitations (search functions)

### Best Practices

1. **Validation**: Use `validate_message()` to check inputs before processing
2. **Error Handling**: Implement try/except in `process_message()` and `on_task_error()`
3. **Cancellation**: Check `context.is_cancelled()` in long-running loops
4. **Timeouts**: Use appropriate timeouts for inter-agent calls
5. **Resource Cleanup**: Always call `client.close()` and `document_client.close()`

## Examples

### Basic Agent

```python
from health_universe_a2a import Agent, AgentContext
import json

class DataAnalyzer(Agent):
    def get_agent_name(self) -> str:
        return "Clinical Data Analyzer"
    
    def get_agent_description(self) -> str:
        return "Analyzes clinical datasets and generates insights"
    
    async def process_message(self, message: str, context: AgentContext) -> str:
        # List documents
        docs = await context.documents.list_documents(role="source")
        
        # Process with progress updates
        await context.update_progress("Loading data...", 0.1)
        
        results = []
        for i, doc in enumerate(docs):
            content = await context.documents.download_text(doc.id)
            result = self.analyze_document(content)
            results.append(result)
            
            await context.update_progress(
                f"Processed {i+1}/{len(docs)} documents",
                (i+1) / len(docs)
            )
        
        # Write results
        await context.documents.write(
            "Analysis Results",
            json.dumps(results, indent=2),
            filename="analysis.json"
        )
        
        return f"Analysis complete! Processed {len(docs)} documents."
    
    def analyze_document(self, content: str) -> dict:
        return {"length": len(content), "lines": len(content.splitlines())}

if __name__ == "__main__":
    agent = DataAnalyzer()
    agent.serve()
```

### Inter-Agent Communication

```python
from health_universe_a2a import Agent, AgentContext

class OrchestratorAgent(Agent):
    def get_agent_name(self) -> str:
        return "Document Orchestrator"
    
    def get_agent_description(self) -> str:
        return "Orchestrates document processing workflow"
    
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call document reader
        reader_response = await self.call_agent(
            "document-reader", 
            message, 
            context
        )
        
        # Call analyzer with reader results
        analyzer_response = await self.call_agent(
            "data-analyzer",
            reader_response,
            context
        )
        
        return f"Workflow complete: {analyzer_response}"
```

### Local Development

```python
from health_universe_a2a.local import create_local_context

async def test_agent_locally():
    agent = DataAnalyzer()
    context = create_local_context("./test_data")
    
    result = await agent.process_message("Analyze documents", context)
    print(f"Result: {result}")

if __name__ == "__main__":
    import asyncio
    asyncio.run(test_agent_locally())
```
