# Inter-Agent Communication

The Health Universe A2A SDK provides powerful methods for agents to communicate with each other, enabling complex multi-agent workflows and orchestration patterns. This guide covers how to use `call_agent()` and `call_other_agent()` methods for seamless agent-to-agent communication.

## Overview

Inter-agent communication allows your A2A agents to:

* **Orchestrate workflows** by calling specialized agents for specific tasks
* **Chain processing steps** where one agent's output feeds into another
* **Distribute workload** across multiple specialized microservices
* **Leverage existing tools** without reimplementing functionality

The SDK provides two primary methods for agent communication:

* `call_agent()` - Unified method with automatic registry resolution
* `call_other_agent()` - Context-aware method with JWT propagation

## Basic Agent Communication

### Using `call_agent()`

The `call_agent()` method is the most flexible approach, supporting multiple identifier formats:

```python
from health_universe_a2a import Agent, AgentContext

class OrchestratorAgent(Agent):
    def get_agent_name(self) -> str:
        return "Document Orchestrator"

    def get_agent_description(self) -> str:
        return "Coordinates document processing workflow"

    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call with agent name (registry lookup)
        analysis = await self.call_agent("document-analyzer", message, context)
        
        # Call with local path (same pod)
        summary = await self.call_agent("/summarizer", analysis, context)
        
        # Call with direct URL
        validation = await self.call_agent(
            "https://external-service.com/validator", 
            summary, 
            context
        )
        
        return f"Workflow complete: {validation}"
```

### Using `call_other_agent()`

For more explicit context propagation and A2A-specific features:

```python
class ProcessingAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call another agent with full A2A response
        response = await self.call_other_agent(
            "/data-processor", 
            message, 
            context, 
            timeout=60.0
        )
        
        # Access different response formats
        text_result = response.text  # Simple text access
        structured_data = response.data  # Structured data access
        all_parts = response.parts  # All response parts
        raw_response = response.raw_response  # Full A2A response
        
        return f"Processed: {text_result}"
```

## Agent Registry Configuration

For agent name resolution, configure a registry using environment variables:

### Option 1: Environment Variable

```bash
export AGENT_REGISTRY='{"document-analyzer": "http://analyzer:8000", "summarizer": "http://summarizer:8001"}'
```

### Option 2: Configuration File

Create `agents.json`:

```json
{
  "agents": {
    "document-analyzer": {
      "url": "http://localhost:8010",
      "name": "Document Analyzer",
      "description": "Analyzes document structure and content"
    },
    "summarizer": {
      "url": "http://localhost:8002", 
      "name": "Text Summarizer"
    },
    "protocol-generator": {
      "url": "http://localhost:8003"
    }
  }
}
```

Set the registry path:

```bash
export AGENT_REGISTRY_PATH="./agents.json"
```

## Multi-Agent Deployment

Deploy multiple agents in a single server for efficient local communication:

```python
from health_universe_a2a import serve_multi_agents

# Define your agents
orchestrator = OrchestratorAgent()
analyzer = DocumentAnalyzerAgent()
summarizer = SummarizerAgent()

# Serve all agents on one server
if __name__ == "__main__":
    serve_multi_agents({
        "/orchestrator": orchestrator,
        "/analyzer": analyzer,
        "/summarizer": summarizer,
    }, port=8501)
```

Now agents can call each other using relative paths:

```python
class OrchestratorAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Call local agents efficiently
        analysis = await self.call_agent("/analyzer", message, context)
        summary = await self.call_agent("/summarizer", analysis, context)
        return summary
```

## Advanced Communication Patterns

### Sequential Processing

Chain multiple agents for step-by-step processing:

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    await context.update_progress("Starting document processing...", 0.1)
    
    # Step 1: Extract text
    extracted = await self.call_agent("text-extractor", message, context)
    await context.update_progress("Text extracted", 0.3)
    
    # Step 2: Analyze content
    analysis = await self.call_agent("content-analyzer", extracted, context)
    await context.update_progress("Content analyzed", 0.6)
    
    # Step 3: Generate summary
    summary = await self.call_agent("summarizer", analysis, context)
    await context.update_progress("Summary generated", 0.9)
    
    return summary
```

### Parallel Processing

Call multiple agents concurrently for improved performance:

```python
import asyncio

async def process_message(self, message: str, context: AgentContext) -> str:
    # Launch multiple agents in parallel
    tasks = [
        self.call_agent("sentiment-analyzer", message, context),
        self.call_agent("keyword-extractor", message, context),
        self.call_agent("entity-recognizer", message, context)
    ]
    
    # Wait for all to complete
    sentiment, keywords, entities = await asyncio.gather(*tasks)
    
    # Combine results
    combined_analysis = {
        "sentiment": sentiment,
        "keywords": keywords,
        "entities": entities
    }
    
    return json.dumps(combined_analysis)
```

### Error Handling and Fallbacks

Implement robust error handling for agent communication:

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    try:
        # Try primary processing agent
        result = await self.call_agent(
            "primary-processor", 
            message, 
            context,
            timeout=30.0
        )
        return result
    
    except Exception as e:
        logger.warning(f"Primary processor failed: {e}")
        
        try:
            # Fallback to secondary agent
            result = await self.call_agent(
                "backup-processor", 
                message, 
                context,
                timeout=60.0
            )
            return f"Processed via backup: {result}"
        
        except Exception as fallback_error:
            logger.error(f"Backup processor also failed: {fallback_error}")
            return "Processing failed - manual review required"
```

### Structured Data Communication

Pass complex data structures between agents:

```python
class DataProcessorAgent(Agent):
    async def process_message(self, message: str, context: AgentContext) -> str:
        # Parse input
        request_data = json.loads(message)
        
        # Prepare structured data for next agent
        analysis_request = {
            "document_ids": request_data.get("documents", []),
            "analysis_type": "comprehensive",
            "parameters": {
                "include_sentiment": True,
                "extract_entities": True,
                "generate_summary": True
            }
        }
        
        # Call with structured data
        response = await self.call_other_agent_with_data(
            "/document-analyzer",
            analysis_request,
            context
        )
        
        # Parse structured response
        analysis_results = response.data
        return json.dumps(analysis_results)
```

## Best Practices

### 1. Design for Reliability

```python
async def reliable_agent_call(self, agent_id: str, message: str, context: AgentContext):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return await self.call_agent(agent_id, message, context, timeout=30.0)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
```

### 2. Use Appropriate Timeouts

```python
# Quick operations
result = await self.call_agent("validator", data, context, timeout=10.0)

# Heavy processing
analysis = await self.call_agent("ml-analyzer", data, context, timeout=300.0)
```

### 3. Propagate Context Information

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    # Context is automatically propagated with JWT tokens
    # User ID, thread ID, and file access tokens are maintained
    result = await self.call_agent("processor", message, context)
    
    # The called agent will have access to the same documents and user context
    return result
```

### 4. Monitor and Log Communication

```python
async def process_message(self, message: str, context: AgentContext) -> str:
    start_time = time.time()
    
    try:
        await context.update_progress("Calling analysis agent...", 0.2)
        result = await self.call_agent("analyzer", message, context)
        
        duration = time.time() - start_time
        self.logger.info(f"Agent call completed in {duration:.2f}s")
        
        return result
    
    except Exception as e:
        self.logger.error(f"Agent call failed after {time.time() - start_time:.2f}s: {e}")
        raise
```

## Example: Complete Document Processing Workflow

Here's a comprehensive example showing a document processing workflow with multiple specialized agents:

```python
from health_universe_a2a import Agent, AgentContext
import json
import asyncio

class DocumentWorkflowOrchestrator(Agent):
    def get_agent_name(self) -> str:
        return "Document Workflow Orchestrator"

    def get_agent_description(self) -> str:
        return "Orchestrates complete document processing workflows"

    async def process_message(self, message: str, context: AgentContext) -> str:
        workflow_config = json.loads(message)
        document_ids = workflow_config.get("document_ids", [])
        
        await context.update_progress("Starting document workflow", 0.1)
        
        results = []
        
        for i, doc_id in enumerate(document_ids):
            try:
                # Step 1: Extract and preprocess
                await context.update_progress(f"Processing document {i+1}/{len(document_ids)}: Extracting", 
                                            0.1 + (i * 0.8 / len(document_ids)))
                
                extraction = await self.call_agent(
                    "text-extractor", 
                    json.dumps({"document_id": doc_id}), 
                    context
                )
                
                # Step 2: Analyze content in parallel
                analysis_tasks = [
                    self.call_agent("medical-ner", extraction, context),
                    self.call_agent("sentiment-analyzer", extraction, context),
                    self.call_agent("clinical-classifier", extraction, context)
                ]
                
                entities, sentiment, classification = await asyncio.gather(*analysis_tasks)
                
                # Step 3: Generate structured report
                report_data = {
                    "document_id": doc_id,
                    "entities": json.loads(entities),
                    "sentiment": json.loads(sentiment),
                    "classification": json.loads(classification)
                }
                
                report = await self.call_agent(
                    "report-generator",
                    json.dumps(report_data),
                    context
                )
                
                results.append({
                    "document_id": doc_id,
                    "status": "completed",
                    "report": json.loads(report)
                })
                
            except Exception as e:
                self.logger.error(f"Failed to process document {doc_id}: {e}")
                results.append({
                    "document_id": doc_id,
                    "status": "failed",
                    "error": str(e)
                })
        
        await context.update_progress("Workflow completed", 1.0)
        
        # Save final results
        final_report = {
            "workflow_id": workflow_config.get("workflow_id"),
            "total_documents": len(document_ids),
            "successful": len([r for r in results if r["status"] == "completed"]),
            "failed": len([r for r in results if r["status"] == "failed"]),
            "results": results
        }
        
        await context.document_client.write(
            "Workflow Results",
            json.dumps(final_report, indent=2),
            filename="workflow_results.json"
        )
        
        return f"Processed {len(document_ids)} documents. See workflow_results.json for details."
```

This example demonstrates:

* Sequential and parallel agent calls
* Progress tracking throughout the workflow
* Error handling for individual documents
* Structured data passing between agents
* Result aggregation and persistence

Inter-agent communication enables you to build sophisticated, modular healthcare workflows that leverage the strengths of specialized agents while maintaining clean separation of concerns.
