A2A demo简单实现(使用qwen模型)(一)

news/2025/5/24 0:41:57

项目准备

初始化项目

uv init --package my-project
cd my-project

 创建虚拟环境

uv venv .venv

激活虚拟环境:

# For Windows
.venv\Scripts\activate# For macOS/Linux
source .venv/bin/activate

添加A2A库

uv add git+https://github.com/djsamseng/A2A#subdirectory=samples/python --branch prefixPythonPackage

创建agent和task_manager文件

touch src/my_project/agent.pytouch 
touch src/my_project/task_manager.py

定义代理技能(Agent Skills)

基本结构

首先,我们定义技能的基本结构。示例:

{"id": "my-project-echo-skill","name": "Echo Tool","description": "Echos the input given","tags": ["echo", "repeater"],"examples": ["I will see this echoed back to me"],"inputModes": ["text"],"outputModes": ["text"]
}

定义 Agent Card(代理名片)

基本概念

代理名片是描述代理能力和技能的 JSON 格式文档,此外它还包含身份验证机制。通过代理卡片,外界可以了解代理的功能以及如何与其交互。

代理卡片的主要作用是:

  • 公布代理的功能和技能。
  • 提供与代理交互时所需的信息(例如,代理的 URL 和输入/输出模式)。
  • 确保其他系统或用户能够正确调用和利用代理的能力。
# 添加代理能力和代理卡片capabilities = AgentCapabilities()agent_card = AgentCard(name="Echo Agent",description="This agent echos the input given",url=f"http://{host}:{port}/",version="0.1.0",defaultInputModes=["text"],defaultOutputModes=["text"],capabilities=capabilities,skills=[skill])logging.info(agent_card)

任务管理器(Task Manager)

基本概念

在创建服务器之前,我们需要实现一个任务管理器。任务管理器的职责是处理传入的请求,管理任务的状态并返回响应。

 实现任务管理器将实现 InMemoryTaskManager 接口,即继承InMemoryTaskManager类。这要求要实现以下两个异步方法:

async def on_send_task(self,request: SendTaskRequest
) -> SendTaskResponse:"""此方法用于查询或创建代理任务。调用者将收到一个确切的响应。当需要获取单次任务执行结果时使用此方法。任务完成后会立即返回处理结果。"""passasync def on_send_task_subscribe(self,request: SendTaskStreamingRequest
) -> AsyncInterable[SendTaskStreamingResponse] | JSONRPCResponse:"""此方法用于订阅任务的后续更新。调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。适用于需要持续监控任务进度的场景。可以实时获取任务状态变化、中间结果等信息。支持长时间运行的任务场景。"""pass

本次demo的任务管理器实现src/my_project/task_manager.py 文件:

import asyncio
from typing import AsyncIterable
from my_project.agent import ReimbursementAgent
import google_a2a
from google_a2a.common.server.task_manager import InMemoryTaskManager
from google_a2a.common.types import (Artifact,  # 任务产物/结果JSONRPCResponse,  # JSON-RPC响应Message,  # 消息对象SendTaskRequest,  # 发送任务请求SendTaskResponse,  # 发送任务响应SendTaskStreamingRequest,  # 流式发送任务请求SendTaskStreamingResponse,  # 流式发送任务响应Task,  # 任务对象TaskState,  # 任务状态TaskStatus,  # 任务状态信息TaskStatusUpdateEvent,  # 任务状态更新事件
)
from my_project.qwen import llmclass MyAgentTaskManager(InMemoryTaskManager):"""自定义任务管理器类,继承自InMemoryTaskManager"""def __init__(self, agent:ReimbursementAgent):super().__init__()self.agent = agentasync def on_send_task(self, request: SendTaskRequest) -> SendTaskResponse:"""此方法用于查询或创建代理任务。调用者将收到一个确切的响应。当需要获取单次任务执行结果时使用此方法。任务完成后会立即返回处理结果。"""# 将任务保存到内存任务管理器中await self.upsert_task(request.params)task_id = request.params.id# 获取接收到的文本内容query = request.params.message.parts[0].text# 初始化回应文本result = f"on_send_task received: {query}"try:result = self.agent.invoke(query, request.params.sessionId)except Exception as e:print(f"Error invoking agent: {e}")raise ValueError(f"Error invoking agent: {e}")parts = [{"type": "text", "text": result}]# 更新任务状态为已完成,并返回echo响应task = await self._update_task(task_id=task_id,task_state=TaskState.COMPLETED,response_text=f"on_send_task received: {result}")# 返回任务响应return SendTaskResponse(id=request.id, result=task)async def _stream_3_messages(self, request: SendTaskStreamingRequest):# 异步方法:按顺序发送3条消息task_id = request.params.id  # 获取任务IDreceived_text = request.params.message.parts[0].text  # 获取用户发送的文本text_messages = ["one", "two", "three"]  # 定义要发送的3条消息for text in text_messages:# 为每条消息创建响应内容parts = [{"type": "text","text": f"{received_text}: {text}",  # 组合用户输入和当前消息}]message = Message(role="agent", parts=parts)  # 创建消息对象is_last = text == text_messages[-1]  # 检查是否是最后一条消息task_state = TaskState.COMPLETED if is_last else TaskState.WORKING  # 根据是否是最后一条消息设置任务状态task_status = TaskStatus(state=task_state,message=message)  # 创建任务状态对象task_update_event = TaskStatusUpdateEvent(id=request.params.id,status=task_status,final=is_last,)  # 创建任务更新事件await self.enqueue_events_for_sse(request.params.id,task_update_event)  # 将事件加入队列等待发送async def on_send_task_subscribe(self,request: SendTaskStreamingRequest) -> AsyncIterable[SendTaskStreamingResponse] | JSONRPCResponse:"""用于处理流式任务请求的方法目前未实现,后续可以根据需要添加流式处理逻辑流式传输允许客户端订阅服务器,并接收多个更新,而不是仅仅一个响应。这在处理长期运行的代理任务或需要将多个工件(Artifacts)传回客户端时非常有用。此方法用于订阅任务的后续更新。调用者将收到初始响应,并通过建立的客户端-服务器会话接收订阅更新。适用于需要持续监控任务进度的场景。可以实时获取任务状态变化、中间结果等信息。支持长时间运行的任务场景。"""# 处理订阅请求的异步方法# 将任务保存到内存任务管理器中await self.upsert_task(request.params)task_id = request.params.id# 为该任务创建一个工作队列sse_event_queue = await self.setup_sse_consumer(task_id=task_id)# 启动异步任务处理消息流asyncio.create_task(self._stream_3_messages(request))# 返回事件流,告诉客户端期待后续的流式响应return self.dequeue_events_for_sse(request_id=request.id,task_id=task_id,sse_event_queue=sse_event_queue,)async def _update_task(self,task_id: str,  # 任务IDtask_state: TaskState,  # 任务状态response_text: str,  # 响应文本) -> Task:"""更新任务状态的内部方法参数:task_id: 要更新的任务IDtask_state: 新的任务状态response_text: 响应文本内容返回:更新后的任务对象"""# 从任务字典中获取任务对象task = self.tasks[task_id]# 构造响应部分agent_response_parts = [{"type": "text","text": response_text,}]# 更新任务状态task.status = TaskStatus(state=task_state,message=Message(role="agent",parts=agent_response_parts,))# 更新任务产物task.artifacts = [Artifact(parts=agent_response_parts,)]return task

不使用流式的话只需要实现on_send_task函数,on_send_task_subscribe主要用于流式的时候。

Agent功能

实现src/my_project/agent.py 文件,添加agent功能

import json
import random
from typing import Any, AsyncIterable, Dict, Optional
from google.adk.agents.llm_agent import LlmAgent
from google.adk.tools.tool_context import ToolContext
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.genai import types
from my_project.qwen import llm# Local cache of created request_ids for demo purposes.
request_ids = set()def create_request_form(date: Optional[str] = None, amount: Optional[str] = None, purpose: Optional[str] = None) -> dict[str, Any]:"""Create a request form for the employee to fill out.Args:date (str): The date of the request. Can be an empty string.amount (str): The requested amount. Can be an empty string.purpose (str): The purpose of the request. Can be an empty string.Returns:dict[str, Any]: A dictionary containing the request form data."""request_id = "request_id_" + str(random.randint(1000000, 9999999))request_ids.add(request_id)return {"request_id": request_id,"date": "<transaction date>" if not date else date,"amount": "<transaction dollar amount>" if not amount else amount,"purpose": "<business justification/purpose of the transaction>" if not purpose else purpose,}def return_form(form_request: dict[str, Any],    tool_context: ToolContext,instructions: Optional[str] = None) -> dict[str, Any]:"""Returns a structured json object indicating a form to complete.Args:form_request (dict[str, Any]): The request form data.tool_context (ToolContext): The context in which the tool operates.instructions (str): Instructions for processing the form. Can be an empty string.       Returns:dict[str, Any]: A JSON dictionary for the form response."""  if isinstance(form_request, str):form_request = json.loads(form_request)tool_context.actions.skip_summarization = Truetool_context.actions.escalate = Trueform_dict = {'type': 'form','form': {'type': 'object','properties': {'date': {'type': 'string','format': 'date','description': 'Date of expense','title': 'Date',},'amount': {'type': 'string','format': 'number','description': 'Amount of expense','title': 'Amount',},'purpose': {'type': 'string','description': 'Purpose of expense','title': 'Purpose',},'request_id': {'type': 'string','description': 'Request id','title': 'Request ID',},},'required': list(form_request.keys()),},'form_data': form_request,'instructions': instructions,}return json.dumps(form_dict)def reimburse(request_id: str) -> dict[str, Any]:"""Reimburse the amount of money to the employee for a given request_id."""if request_id not in request_ids:return {"request_id": request_id, "status": "Error: Invalid request_id."}return {"request_id": request_id, "status": "approved"}class ReimbursementAgent:"""An agent that handles reimbursement requests."""SUPPORTED_CONTENT_TYPES = ["text", "text/plain"]def __init__(self):self._agent = self._build_agent()self._user_id = "remote_agent"self._runner = Runner(app_name=self._agent.name,agent=self._agent,artifact_service=InMemoryArtifactService(),session_service=InMemorySessionService(),memory_service=InMemoryMemoryService(),)def invoke(self, query, session_id) -> str:session = self._runner.session_service.get_session(app_name=self._agent.name, user_id=self._user_id, session_id=session_id)content = types.Content(role="user", parts=[types.Part.from_text(text=query)])if session is None:session = self._runner.session_service.create_session(app_name=self._agent.name,user_id=self._user_id,state={},session_id=session_id,)events = list(self._runner.run(user_id=self._user_id, session_id=session.id, new_message=content))if not events or not events[-1].content or not events[-1].content.parts:return ""return "\n".join([p.text for p in events[-1].content.parts if p.text])async def stream(self, query, session_id) -> AsyncIterable[Dict[str, Any]]:session = self._runner.session_service.get_session(app_name=self._agent.name, user_id=self._user_id, session_id=session_id)content = types.Content(role="user", parts=[types.Part.from_text(text=query)])if session is None:session = self._runner.session_service.create_session(app_name=self._agent.name,user_id=self._user_id,state={},session_id=session_id,)async for event in self._runner.run_async(user_id=self._user_id, session_id=session.id, new_message=content):if event.is_final_response():response = ""if (event.contentand event.content.partsand event.content.parts[0].text):response = "\n".join([p.text for p in event.content.parts if p.text])elif (event.contentand event.content.partsand any([True for p in event.content.parts if p.function_response])):response = next((p.function_response.model_dump() for p in event.content.parts))yield {"is_task_complete": True,"content": response,}else:yield {"is_task_complete": False,"updates": "Processing the reimbursement request...",}def _build_agent(self) -> LlmAgent:"""Builds the LLM agent for the reimbursement agent."""return LlmAgent(model=llm,name="reimbursement_agent",description=("This agent handles the reimbursement process for the employees"" given the amount and purpose of the reimbursement."),instruction="""You are an agent who handles the reimbursement process for employees.When you receive an reimbursement request, you should first create a new request form using create_request_form(). Only provide default values if they are provided by the user, otherwise use an empty string as the default value.1. 'Date': the date of the transaction.2. 'Amount': the dollar amount of the transaction.3. 'Business Justification/Purpose': the reason for the reimbursement.Once you created the form, you should return the result of calling return_form with the form data from the create_request_form call.Once you received the filled-out form back from the user, you should then check the form contains all required information:1. 'Date': the date of the transaction.2. 'Amount': the value of the amount of the reimbursement being requested.3. 'Business Justification/Purpose': the item/object/artifact of the reimbursement.If you don't have all of the information, you should reject the request directly by calling the request_form method, providing the missing fields.For valid reimbursement requests, you can then use reimburse() to reimburse the employee.* In your response, you should include the request_id and the status of the reimbursement request.""",tools=[create_request_form,reimburse,return_form,],)

主函数

实现src/my_project/__init__.py 文件,实现主函数启动服务

import logging
import click
from dotenv import load_dotenv
import google_a2a
from google_a2a.common.types import AgentSkill,AgentCapabilities, AgentCard
from google_a2a.common.server import A2AServer
from my_project.task_manager import MyAgentTaskManager
from my_project.agent import ReimbursementAgent# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)@click.command()
@click.option("--host", default="localhost")
@click.option("--port", default=10002)
def main(host, port):skill = AgentSkill(id="my-project-echo-skill",name="Echo Tool",description="Echos the input given",tags=["echo", "repeater"],examples=["I will see this echoed back to me"],inputModes=["text"],outputModes=["text"],)logging.info(skill)# 添加代理能力和代理卡片capabilities = AgentCapabilities(streaming=False)agent_card = AgentCard(name="Echo Agent",description="This agent echos the input given",url=f"http://{host}:{port}/",version="0.1.0",defaultInputModes=["text"],defaultOutputModes=["text"],capabilities=capabilities,skills=[skill])logging.info(agent_card)task_manager = MyAgentTaskManager(agent=ReimbursementAgent())  # 创建任务管理器实例server = A2AServer(agent_card=agent_card,task_manager=task_manager,host=host,port=port,)server.start()  # 启动服务器if __name__ == "__main__":main()

大模型llm使用

我使用的qwen

my_project/qwen.py文件

from google.adk.models.lite_llm import LiteLlmllm = LiteLlm(model="openai/qwen-max-2025-01-25",api_key='自己的阿里api——key',base_url='https://dashscope.aliyuncs.com/compatible-mode/v1',temperature=0.3,
)

测试功能

启动服务:

uv run my-project

这样就是启动成功了

然后启动谷歌自带的客户端:

uv run google-a2a-cli --agent http://localhost:10002

输入你好后

服务端的后台

 


https://dhexx.cn/news/show-5514455.html

相关文章

JavaScript性能优化实战,从理论到落地的全面指南

在前端开发领域&#xff0c;JavaScript的性能优化是提升用户体验的核心环节。随着Web应用复杂度的提升&#xff0c;开发者面临的性能瓶颈也日益多样化。本文将从理论分析、代码实践和工具使用三个维度&#xff0c;系统性地讲解JavaScript性能优化的实战技巧&#xff0c;并通过大…

第十五章,SSL VPN

前言 IPSec 和 SSL 对比 IPSec远程接入场景---client提前安装软件&#xff0c;存在一定的兼容性问题 IPSec协议只能够对感兴趣的流量进行加密保护&#xff0c;意味着接入用户需要不停的调整策略&#xff0c;来适应IPSec隧道 IPSec协议对用户访问权限颗粒度划分的不够详细&…

如何构建容器镜像并将其推送到极狐GitLab容器镜像库?

极狐GitLab 是 GitLab 在中国的发行版&#xff0c;关于中文参考文档和资料有&#xff1a; 极狐GitLab 中文文档极狐GitLab 中文论坛极狐GitLab 官网 构建容器镜像并将其推送到容器镜像库 (BASIC ALL) 在构建和推送容器镜像之前&#xff0c;您必须通过容器镜像库的身份验证。 …

Django项目中不同app使用不同数据库的实现

在某些复杂的Django项目中&#xff0c;可能需要将不同的应用程序&#xff08;app&#xff09;分配到不同的数据库中&#xff0c;以实现数据隔离、负载均衡或其他特定需求。本文将详细介绍如何在Django项目中实现不同app使用不同数据库。 一、配置多数据库 首先&#xff0c;在…

CI/CD面试题及答案

一、CI/CD 基础概念 1. 什么是 CI/CD&#xff1f;CI 和 CD 的区别是什么&#xff1f; 答案&#xff1a; CI&#xff08;持续集成&#xff09;&#xff1a;开发人员提交代码后&#xff0c;自动构建并运行测试&#xff0c;确保代码集成无冲突。CD&#xff08;持续交付 / 部署&am…

LVDS系列11:Xilinx Ultrascale系可编程输入延迟(一)

Ultrascale系列的可编程输入延迟组件原语为IDELAYE3&#xff0c;IDELAYE3可以用于延迟除时钟外的任何输入信号&#xff0c;然后将其转发到fpga内部逻辑或是寄存到寄存器。IDELAY无法直接布线到全局时钟缓存&#xff0c;若想延迟时钟请使用MMCM和PLL生成时钟并使用其相移功能进行…

【分享】KK/BD/XL等六大不限速下载

超绝软件大揭秘&#x1f680;安卓党看&#x1f525; —————【下 载 地 址】——————— 【本章单下载】&#xff1a;https://drive.uc.cn/s/76a981211b234 【百款黑科技】&#xff1a;https://ucnygalh6wle.feishu.cn/wiki/HPQywvPc7iLZu1k0ODFcWMt2n0d?fromfrom_cop…

istio in action之Gateway流量入口与安全

入口网关&#xff0c;简单来说&#xff0c;就是如何让外部世界和我们精心构建的集群内部服务顺畅地对话。在网络安全领域&#xff0c;有一个词叫流量入口&#xff0c;英文叫Ingress。这指的是那些从我们自己网络之外&#xff0c;比如互联网&#xff0c;发往我们内部网络的流量。…

【Linux】环境变量(图文)

目录 一、main函数的参数解释&#xff1a; 1、argc和argc的解释 2、为什么要这样设置&#xff1f; 3、注意&#xff1a; 4、命令行计算器&#xff1a; 二、认识环境变量 三、见见环境变量 1、执行一个程序的前提 2、指令&#xff1a;echo $PATH 3、为什么系统自带的指令…

ZLG致远电子与天玛智控签署战略合作协议,共推煤矿智能化新变革

5月7日上午&#xff0c;ZLG致远电子与天玛智控在广州正式签署战略合作协议&#xff0c;双方将围绕煤矿无人化智能开采和智能制造开展战略合作&#xff0c;携手推动行业技术创新与发展。 5月7日上午&#xff0c;广州致远电子股份有限公司&#xff08;以下简称“ZLG致远电子”&am…