项目准备
初始化项目
uv init --package my-project
cd my-project
创建虚拟环境
uv venv .venv
激活虚拟环境:
# For Windows
.venv\Scripts\activate# For macOS/Linux
source .venv/bin/activate
添加A2A库
uv add git+https://github.com/djsamseng/A2A#subdirectory=samples/python --branch prefixPythonPackage
创建agent和task_manager文件
touch src/my_project/agent.pytouch
touch src/my_project/task_manager.py
定义代理技能(Agent Skills)
基本结构
首先,我们定义技能的基本结构。示例:
{"id": "my-project-echo-skill","name": "Echo Tool","description": "Echos the input given","tags": ["echo", "repeater"],"examples": ["I will see this echoed back to me"],"inputModes": ["text"],"outputModes": ["text"]
}
定义 Agent Card(代理名片)
基本概念
代理名片是描述代理能力和技能的 JSON 格式文档,此外它还包含身份验证机制。通过代理卡片,外界可以了解代理的功能以及如何与其交互。
代理卡片的主要作用是:
- 公布代理的功能和技能。
- 提供与代理交互时所需的信息(例如,代理的 URL 和输入/输出模式)。
- 确保其他系统或用户能够正确调用和利用代理的能力。
# 添加代理能力和代理卡片capabilities = AgentCapabilities()agent_card = AgentCard(name="Echo Agent",description="This agent echos the input given",url=f"http://{host}:{port}/",version="0.1.0",defaultInputModes=["text"],defaultOutputModes=["text"],capabilities=capabilities,skills=[skill])logging.info(agent_card)
任务管理器(Task Manager)
基本概念
在创建服务器之前,我们需要实现一个任务管理器。任务管理器的职责是处理传入的请求,管理任务的状态并返回响应。
实现任务管理器将实现 InMemoryTaskManager
接口,即继承InMemoryTaskManager类。
这要求要实现以下两个异步方法:
async def on_send_task(self,request: SendTaskRequest
) -> SendTaskResponse:"""此方法用于查询或创建代理任务。调用者将收到一个确切的响应。当需要获取单次任务执行结果时使用此方法。任务完成后会立即返回处理结果。"""passasync def on_send_task_subscribe(self,request: SendTaskStreamingRequest
) -> AsyncInterable[SendTaskStreamingResponse] | JSONRPCResponse:"""此方法用于订阅任务的后续更新。调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。适用于需要持续监控任务进度的场景。可以实时获取任务状态变化、中间结果等信息。支持长时间运行的任务场景。"""pass
本次demo的任务管理器实现src/my_project/task_manager.py
文件:
import asyncio
from typing import AsyncIterable
from my_project.agent import ReimbursementAgent
import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (Artifact, # 任务产物/结果JSONRPCResponse, # JSON-RPC响应Message, # 消息对象SendTaskRequest, # 发送任务请求SendTaskResponse, # 发送任务响应SendTaskStreamingRequest, # 流式发送任务请求SendTaskStreamingResponse, # 流式发送任务响应Task, # 任务对象TaskState, # 任务状态TaskStatus, # 任务状态信息TaskStatusUpdateEvent, # 任务状态更新事件
)
from my_project.qwen import llmclass MyAgentTaskManager(InMemoryTaskManager):"""自定义任务管理器类,继承自InMemoryTaskManager"""def __init__(self, agent:ReimbursementAgent):super().__init__()self.agent = agentasync def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:"""此方法用于查询或创建代理任务。调用者将收到一个确切的响应。当需要获取单次任务执行结果时使用此方法。任务完成后会立即返回处理结果。"""# 将任务保存到内存任务管理器中await self.upsert_task(request.params)task_id = request.params.id# 获取接收到的文本内容query = request.params.message.parts[0].text# 初始化回应文本result = f"on_send_task received: {query}"try:result = self.agent.invoke(query, request.params.sessionId)except Exception as e:print(f"Error invoking agent: {e}")raise ValueError(f"Error invoking agent: {e}")parts = [{"type": "text", "text": result}]# 更新任务状态为已完成,并返回echo响应task = await self._update_task(task_id=task_id,task_state=TaskState.COMPLETED,response_text=f"on_send_task received: {result}")# 返回任务响应return SendTaskResponse(id=request.id, result=task)async def _stream_3_messages(self, request: SendTaskStreamingRequest):# 异步方法:按顺序发送3条消息task_id = request.params.id # 获取任务IDreceived_text = request.params.message.parts[0].text # 获取用户发送的文本text_messages = ["one", "two", "three"] # 定义要发送的3条消息for text in text_messages:# 为每条消息创建响应内容parts = [{"type": "text","text": f"{received_text}: {text}", # 组合用户输入和当前消息}]message = Message(role="agent", parts=parts) # 创建消息对象is_last = text == text_messages[-1] # 检查是否是最后一条消息task_state = TaskState.COMPLETED if is_last else TaskState.WORKING # 根据是否是最后一条消息设置任务状态task_status = TaskStatus(state=task_state,message=message) # 创建任务状态对象task_update_event = TaskStatusUpdateEvent(id=request.params.id,status=task_status,final=is_last,) # 创建任务更新事件await self.enqueue_events_for_sse(request.params.id,task_update_event) # 将事件加入队列等待发送async def on_send_task_subscribe(self,request: SendTaskStreamingRequest) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:"""用于处理流式任务请求的方法目前未实现,后续可以根据需要添加流式处理逻辑流式传输允许客户端订阅服务器,并接收多个更新,而不是仅仅一个响应。这在处理长期运行的代理任务或需要将多个工件(Artifacts)传回客户端时非常有用。此方法用于订阅任务的后续更新。调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。适用于需要持续监控任务进度的场景。可以实时获取任务状态变化、中间结果等信息。支持长时间运行的任务场景。"""# 处理订阅请求的异步方法# 将任务保存到内存任务管理器中await self.upsert_task(request.params)task_id = request.params.id# 为该任务创建一个工作队列sse_event_queue = await self.setup_sse_consumer(task_id=task_id)# 启动异步任务处理消息流asyncio.create_task(self._stream_3_messages(request))# 返回事件流,告诉客户端期待后续的流式响应return self.dequeue_events_for_sse(request_id=request.id,task_id=task_id,sse_event_queue=sse_event_queue,)async def _update_task(self,task_id: str, # 任务IDtask_state: TaskState, # 任务状态response_text: str, # 响应文本) -> Task:"""更新任务状态的内部方法参数:task_id: 要更新的任务IDtask_state: 新的任务状态response_text: 响应文本内容返回:更新后的任务对象"""# 从任务字典中获取任务对象task = self.tasks[task_id]# 构造响应部分agent_response_parts = [{"type": "text","text": response_text,}]# 更新任务状态task.status = TaskStatus(state=task_state,message=Message(role="agent",parts=agent_response_parts,))# 更新任务产物task.artifacts = [Artifact(parts=agent_response_parts,)]return task
不使用流式的话只需要实现on_send_task函数,on_send_task_subscribe主要用于流式的时候。
Agent功能
实现src/my_project/agent.py
文件,添加agent功能
import json
import random
from typing import Any, AsyncIterable, Dict, Optional
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_context import ToolContext
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from my_project.qwen import llm# Local cache of created request_ids for demo purposes.
request_ids = set()def create_request_form(date: Optional[str] = None, amount: Optional[str] = None, purpose: Optional[str] = None) -> dict[str, Any]:"""Create a request form for the employee to fill out.Args:date (str): The date of the request. Can be an empty string.amount (str): The requested amount. Can be an empty string.purpose (str): The purpose of the request. Can be an empty string.Returns:dict[str, Any]: A dictionary containing the request form data."""request_id = "request_id_" + str(random.randint(1000000, 9999999))request_ids.add(request_id)return {"request_id": request_id,"date": "<transaction date>" if not date else date,"amount": "<transaction dollar amount>" if not amount else amount,"purpose": "<business justification/purpose of the transaction>" if not purpose else purpose,}def return_form(form_request: dict[str, Any], tool_context: ToolContext,instructions: Optional[str] = None) -> dict[str, Any]:"""Returns a structured json object indicating a form to complete.Args:form_request (dict[str, Any]): The request form data.tool_context (ToolContext): The context in which the tool operates.instructions (str): Instructions for processing the form. Can be an empty string. Returns:dict[str, Any]: A JSON dictionary for the form response.""" if isinstance(form_request, str):form_request = json.loads(form_request)tool_context.actions.skip_summarization = Truetool_context.actions.escalate = Trueform_dict = {'type': 'form','form': {'type': 'object','properties': {'date': {'type': 'string','format': 'date','description': 'Date of expense','title': 'Date',},'amount': {'type': 'string','format': 'number','description': 'Amount of expense','title': 'Amount',},'purpose': {'type': 'string','description': 'Purpose of expense','title': 'Purpose',},'request_id': {'type': 'string','description': 'Request id','title': 'Request ID',},},'required': list(form_request.keys()),},'form_data': form_request,'instructions': instructions,}return json.dumps(form_dict)def reimburse(request_id: str) -> dict[str, Any]:"""Reimburse the amount of money to the employee for a given request_id."""if request_id not in request_ids:return {"request_id": request_id, "status": "Error: Invalid request_id."}return {"request_id": request_id, "status": "approved"}class ReimbursementAgent:"""An agent that handles reimbursement requests."""SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]def __init__(self):self._agent = self._build_agent()self._user_id = "remote_agent"self._runner = Runner(app_name=self._agent.name,agent=self._agent,artifact_service=InMemoryArtifactService(),session_service=InMemorySessionService(),memory_service=InMemoryMemoryService(),)def invoke(self, query, session_id) -> str:session = self._runner.session_service.get_session(app_name=self._agent.name, user_id=self._user_id, session_id=session_id)content = types.Content(role="user", parts=[types.Part.from_text(text=query)])if session is None:session = self._runner.session_service.create_session(app_name=self._agent.name,user_id=self._user_id,state={},session_id=session_id,)events = list(self._runner.run(user_id=self._user_id, session_id=session.id, new_message=content))if not events or not events[-1].content or not events[-1].content.parts:return ""return "\n".join([p.text for p in events[-1].content.parts if p.text])async def stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:session = self._runner.session_service.get_session(app_name=self._agent.name, user_id=self._user_id, session_id=session_id)content = types.Content(role="user", parts=[types.Part.from_text(text=query)])if session is None:session = self._runner.session_service.create_session(app_name=self._agent.name,user_id=self._user_id,state={},session_id=session_id,)async for event in self._runner.run_async(user_id=self._user_id, session_id=session.id, new_message=content):if event.is_final_response():response = ""if (event.contentand event.content.partsand event.content.parts[0].text):response = "\n".join([p.text for p in event.content.parts if p.text])elif (event.contentand event.content.partsand any([True for p in event.content.parts if p.function_response])):response = next((p.function_response.model_dump() for p in event.content.parts))yield {"is_task_complete": True,"content": response,}else:yield {"is_task_complete": False,"updates": "Processing the reimbursement request...",}def _build_agent(self) -> LlmAgent:"""Builds the LLM agent for the reimbursement agent."""return LlmAgent(model=llm,name="reimbursement_agent",description=("This agent handles the reimbursement process for the employees"" given the amount and purpose of the reimbursement."),instruction="""You are an agent who handles the reimbursement process for employees.When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.1. 'Date': the date of the transaction.2. 'Amount': the dollar amount of the transaction.3. 'Business Justification/Purpose': the reason for the reimbursement.Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.Once you received the filled-out form back from the user, you should then check the form contains all required information:1. 'Date': the date of the transaction.2. 'Amount': the value of the amount of the reimbursement being requested.3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.For valid reimbursement requests, you can then use reimburse() to reimburse the employee.* In your response, you should include the request_id and the status of the reimbursement request.""",tools=[create_request_form,reimburse,return_form,],)
主函数
实现src/my_project/__init__.py
文件,实现主函数启动服务
import logging
import click
from dotenv import load_dotenv
import google_a2a
from google_a2a.common.types import AgentSkill,AgentCapabilities, AgentCard
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
from my_project.agent import ReimbursementAgent# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):skill = AgentSkill(id="my-project-echo-skill",name="Echo Tool",description="Echos the input given",tags=["echo", "repeater"],examples=["I will see this echoed back to me"],inputModes=["text"],outputModes=["text"],)logging.info(skill)# 添加代理能力和代理卡片capabilities = AgentCapabilities(streaming=False)agent_card = AgentCard(name="Echo Agent",description="This agent echos the input given",url=f"http://{host}:{port}/",version="0.1.0",defaultInputModes=["text"],defaultOutputModes=["text"],capabilities=capabilities,skills=[skill])logging.info(agent_card)task_manager = MyAgentTaskManager(agent=ReimbursementAgent()) # 创建任务管理器实例server = A2AServer(agent_card=agent_card,task_manager=task_manager,host=host,port=port,)server.start() # 启动服务器if __name__ == "__main__":main()
大模型llm使用
我使用的qwen
my_project/qwen.py文件
from google.adk.models.lite_llm import LiteLlmllm = LiteLlm(model="openai/qwen-max-2025-01-25",api_key='自己的阿里api——key',base_url='https://dashscope.aliyuncs.com/compatible-mode/v1',temperature=0.3,
)
测试功能
启动服务:
uv run my-project
这样就是启动成功了
然后启动谷歌自带的客户端:
uv run google-a2a-cli --agent http://localhost:10002
输入你好后
服务端的后台