# Inter-Agent Communication

The Health Universe A2A SDK provides powerful methods for agents to communicate with each other, enabling complex multi-agent workflows and orchestration patterns. This guide covers how to use `call_agent()` and `call_other_agent()` methods for seamless agent-to-agent communication.

## Overview

Inter-agent communication allows your A2A agents to:

* **Orchestrate workflows** by calling specialized agents for specific tasks
* **Chain processing steps** where one agent's output feeds into another
* **Distribute workload** across multiple specialized microservices
* **Leverage existing tools** without reimplementing functionality

The SDK provides two primary methods for agent communication:

* `call_agent()` - Unified method with automatic registry resolution
* `call_other_agent()` - Context-aware method with JWT propagation

## Basic Agent Communication

### Using `call_agent()`

The `call_agent()` method is the most flexible approach, supporting multiple identifier formats:

```python
from health_universe_a2a import Agent, AgentContext

class OrchestratorAgent(Agent):
    def get_agent_name(self) -> str:
        return "Document Orchestrator"

    def get_agent_description(self) -> str:
        return "Coordinates document processing workflow"

    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call with agent name (registry lookup)
        analysis = await self.call_agent("document-analyzer", message, context)
        
        # Call with local path (same pod)
        summary = await self.call_agent("/summarizer", analysis, context)
        
        # Call with direct URL
        validation = await self.call_agent(
            "https://external-service.com/validator", 
            summary, 
            context
        )
        
        return f"Workflow complete: {validation}"
```

### Using `call_other_agent()`

For more explicit context propagation and A2A-specific features:

```python
class ProcessingAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call another agent with full A2A response
        response = await self.call_other_agent(
            "/data-processor", 
            message, 
            context, 
            timeout=60.0
        )
        
        # Access different response formats
        text_result = response.text  # Simple text access
        structured_data = response.data  # Structured data access
        all_parts = response.parts  # All response parts
        raw_response = response.raw_response  # Full A2A response
        
        return f"Processed: {text_result}"
```

## Agent Registry Configuration

For agent name resolution, configure a registry using environment variables:

### Option 1: Environment Variable

```bash
export AGENT_REGISTRY='{"document-analyzer": "http://analyzer:8000", "summarizer": "http://summarizer:8001"}'
```

### Option 2: Configuration File

Create `agents.json`:

```json
{
  "agents": {
    "document-analyzer": {
      "url": "http://localhost:8010",
      "name": "Document Analyzer",
      "description": "Analyzes document structure and content"
    },
    "summarizer": {
      "url": "http://localhost:8002", 
      "name": "Text Summarizer"
    },
    "protocol-generator": {
      "url": "http://localhost:8003"
    }
  }
}
```

Set the registry path:

```bash
export AGENT_REGISTRY_PATH="./agents.json"
```

## Multi-Agent Deployment

Deploy multiple agents in a single server for efficient local communication:

```python
from health_universe_a2a import serve_multi_agents

# Define your agents
orchestrator = OrchestratorAgent()
analyzer = DocumentAnalyzerAgent()
summarizer = SummarizerAgent()

# Serve all agents on one server
if __name__ == "__main__":
    serve_multi_agents({
        "/orchestrator": orchestrator,
        "/analyzer": analyzer,
        "/summarizer": summarizer,
    }, port=8501)
```

Now agents can call each other using relative paths:

```python
class OrchestratorAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call local agents efficiently
        analysis = await self.call_agent("/analyzer", message, context)
        summary = await self.call_agent("/summarizer", analysis, context)
        return summary
```

## Advanced Communication Patterns

### Sequential Processing

Chain multiple agents for step-by-step processing:

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    await context.update_progress("Starting document processing...", 0.1)
    
    # Step 1: Extract text
    extracted = await self.call_agent("text-extractor", message, context)
    await context.update_progress("Text extracted", 0.3)
    
    # Step 2: Analyze content
    analysis = await self.call_agent("content-analyzer", extracted, context)
    await context.update_progress("Content analyzed", 0.6)
    
    # Step 3: Generate summary
    summary = await self.call_agent("summarizer", analysis, context)
    await context.update_progress("Summary generated", 0.9)
    
    return summary
```

### Parallel Processing

Call multiple agents concurrently for improved performance:

```python
import asyncio

async def process_message(self, message: str, context: AgentContext) -> str:
    # Launch multiple agents in parallel
    tasks = [
        self.call_agent("sentiment-analyzer", message, context),
        self.call_agent("keyword-extractor", message, context),
        self.call_agent("entity-recognizer", message, context)
    ]
    
    # Wait for all to complete
    sentiment, keywords, entities = await asyncio.gather(*tasks)
    
    # Combine results
    combined_analysis = {
        "sentiment": sentiment,
        "keywords": keywords,
        "entities": entities
    }
    
    return json.dumps(combined_analysis)
```

### Error Handling and Fallbacks

Implement robust error handling for agent communication:

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    try:
        # Try primary processing agent
        result = await self.call_agent(
            "primary-processor", 
            message, 
            context,
            timeout=30.0
        )
        return result
    
    except Exception as e:
        logger.warning(f"Primary processor failed: {e}")
        
        try:
            # Fallback to secondary agent
            result = await self.call_agent(
                "backup-processor", 
                message, 
                context,
                timeout=60.0
            )
            return f"Processed via backup: {result}"
        
        except Exception as fallback_error:
            logger.error(f"Backup processor also failed: {fallback_error}")
            return "Processing failed - manual review required"
```

### Structured Data Communication

Pass complex data structures between agents:

```python
class DataProcessorAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Parse input
        request_data = json.loads(message)
        
        # Prepare structured data for next agent
        analysis_request = {
            "document_ids": request_data.get("documents", []),
            "analysis_type": "comprehensive",
            "parameters": {
                "include_sentiment": True,
                "extract_entities": True,
                "generate_summary": True
            }
        }
        
        # Call with structured data
        response = await self.call_other_agent_with_data(
            "/document-analyzer",
            analysis_request,
            context
        )
        
        # Parse structured response
        analysis_results = response.data
        return json.dumps(analysis_results)
```

## Best Practices

### 1. Design for Reliability

```python
async def reliable_agent_call(self, agent_id: str, message: str, context: AgentContext):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return await self.call_agent(agent_id, message, context, timeout=30.0)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
```

### 2. Use Appropriate Timeouts

```python
# Quick operations
result = await self.call_agent("validator", data, context, timeout=10.0)

# Heavy processing
analysis = await self.call_agent("ml-analyzer", data, context, timeout=300.0)
```

### 3. Propagate Context Information

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    # Context is automatically propagated with JWT tokens
    # User ID, thread ID, and file access tokens are maintained
    result = await self.call_agent("processor", message, context)
    
    # The called agent will have access to the same documents and user context
    return result
```

### 4. Monitor and Log Communication

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    start_time = time.time()
    
    try:
        await context.update_progress("Calling analysis agent...", 0.2)
        result = await self.call_agent("analyzer", message, context)
        
        duration = time.time() - start_time
        self.logger.info(f"Agent call completed in {duration:.2f}s")
        
        return result
    
    except Exception as e:
        self.logger.error(f"Agent call failed after {time.time() - start_time:.2f}s: {e}")
        raise
```

## Example: Complete Document Processing Workflow

Here's a comprehensive example showing a document processing workflow with multiple specialized agents:

```python
from health_universe_a2a import Agent, AgentContext
import json
import asyncio

class DocumentWorkflowOrchestrator(Agent):
    def get_agent_name(self) -> str:
        return "Document Workflow Orchestrator"

    def get_agent_description(self) -> str:
        return "Orchestrates complete document processing workflows"

    async def process_message(self, message: str, context: AgentContext) -> str:
        workflow_config = json.loads(message)
        document_ids = workflow_config.get("document_ids", [])
        
        await context.update_progress("Starting document workflow", 0.1)
        
        results = []
        
        for i, doc_id in enumerate(document_ids):
            try:
                # Step 1: Extract and preprocess
                await context.update_progress(f"Processing document {i+1}/{len(document_ids)}: Extracting", 
                                            0.1 + (i * 0.8 / len(document_ids)))
                
                extraction = await self.call_agent(
                    "text-extractor", 
                    json.dumps({"document_id": doc_id}), 
                    context
                )
                
                # Step 2: Analyze content in parallel
                analysis_tasks = [
                    self.call_agent("medical-ner", extraction, context),
                    self.call_agent("sentiment-analyzer", extraction, context),
                    self.call_agent("clinical-classifier", extraction, context)
                ]
                
                entities, sentiment, classification = await asyncio.gather(*analysis_tasks)
                
                # Step 3: Generate structured report
                report_data = {
                    "document_id": doc_id,
                    "entities": json.loads(entities),
                    "sentiment": json.loads(sentiment),
                    "classification": json.loads(classification)
                }
                
                report = await self.call_agent(
                    "report-generator",
                    json.dumps(report_data),
                    context
                )
                
                results.append({
                    "document_id": doc_id,
                    "status": "completed",
                    "report": json.loads(report)
                })
                
            except Exception as e:
                self.logger.error(f"Failed to process document {doc_id}: {e}")
                results.append({
                    "document_id": doc_id,
                    "status": "failed",
                    "error": str(e)
                })
        
        await context.update_progress("Workflow completed", 1.0)
        
        # Save final results
        final_report = {
            "workflow_id": workflow_config.get("workflow_id"),
            "total_documents": len(document_ids),
            "successful": len([r for r in results if r["status"] == "completed"]),
            "failed": len([r for r in results if r["status"] == "failed"]),
            "results": results
        }
        
        await context.document_client.write(
            "Workflow Results",
            json.dumps(final_report, indent=2),
            filename="workflow_results.json"
        )
        
        return f"Processed {len(document_ids)} documents. See workflow_results.json for details."
```

This example demonstrates:

* Sequential and parallel agent calls
* Progress tracking throughout the workflow
* Error handling for individual documents
* Structured data passing between agents
* Result aggregation and persistence

Inter-agent communication enables you to build sophisticated, modular healthcare workflows that leverage the strengths of specialized agents while maintaining clean separation of concerns.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.healthuniverse.com/overview/building-apps-in-health-universe/developing-your-health-universe-app/working-with-a2a-agents/inter-agent-communication.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
