""" 10.3.3 使用 HelloAgents A2A 工具 (3)创建Agent网络 """ from hello_agents.protocols import A2AServer, A2AClient import threading import time # 1. 创建多个Agent服务 researcher = A2AServer( name="researcher", description="研究员" ) @researcher.skill("research") def do_research(text: str) -> str: import re match = re.search(r'research\s+(.+)', text, re.IGNORECASE) topic = match.group(1).strip() if match else text return str({"topic": topic, "findings": f"{topic}的研究结果"}) writer = A2AServer( name="writer", description="撰写员" ) @writer.skill("write") def write_article(text: str) -> str: import re match = re.search(r'write\s+(.+)', text, re.IGNORECASE) content = match.group(1).strip() if match else text # 尝试解析研究数据 try: data = eval(content) topic = data.get("topic", "未知主题") findings = data.get("findings", "无研究结果") except: topic = "未知主题" findings = content return f"# {topic}\n\n基于研究:{findings}\n\n文章内容..." editor = A2AServer( name="editor", description="编辑" ) @editor.skill("edit") def edit_article(text: str) -> str: import re match = re.search(r'edit\s+(.+)', text, re.IGNORECASE) article = match.group(1).strip() if match else text result = { "article": article + "\n\n[已编辑优化]", "feedback": "文章质量良好", "approved": True } return str(result) # 2. 启动所有服务 threading.Thread(target=lambda: researcher.run(port=5000), daemon=True).start() threading.Thread(target=lambda: writer.run(port=5001), daemon=True).start() threading.Thread(target=lambda: editor.run(port=5002), daemon=True).start() time.sleep(2) # 等待服务启动 # 3. 创建客户端连接到各个Agent researcher_client = A2AClient("http://localhost:5000") writer_client = A2AClient("http://localhost:5001") editor_client = A2AClient("http://localhost:5002") # 4. 协作流程 def create_content(topic): # 步骤1:研究 research = researcher_client.execute_skill("research", f"research {topic}") research_data = research.get('result', '') # 步骤2:撰写 article = writer_client.execute_skill("write", f"write {research_data}") article_content = article.get('result', '') # 步骤3:编辑 final = editor_client.execute_skill("edit", f"edit {article_content}") return final.get('result', '') # 使用 if __name__ == "__main__": result = create_content("AI在医疗领域的应用") print(f"\n最终结果:\n{result}")