Agent开发,基于Supervisor的多智能体实现 / ai #46

in STEEM CN/中文3 days ago

多智能体(Multi-Agent)作为智能体的流行趋势,怎能不上手实现呢。

基于前面的工具开发和单智能体的开发心得,多智能体也就是个搭积木的过程:将原本的应用程序拆分成多个较小的独立代理,从而组合而成的系统。这些小的独立代理可以是简单的大模型交互代理,也可以是复杂的 ReAct 代理。

LangGraph利用基于图的结构来定义代理并在它们之间建立连接。在此框架中,每个代理都表示为图中的一个节点,并通过边链接到其它代理。每个代理通过接收来自其他代理的输入并将控制权传递给下一个代理来执行其指定的操作。

agent.jpg
多智能体的架构

多智能体的架构有上图不同的架构形式,其中Supervisor是比较主流的方式,那就它啰。

下面来实现一个多智能体系统,以一些基本的功能来感受开发的过程和技巧。如果要添加新的工具和功能,只需直接添加即可,也就是它的拓展性非常好,也很易用。下面是案例代码,大家可以上手试试。

supervisor.jpg
Supervisor案例示意图

实践案例

from typing import Union, Optional, Annotated, Literal
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from dotenv import dotenv_values
from langchain_openai import ChatOpenAI
from langgraph.graph import START, StateGraph, MessagesState, END
import operator, requests, json
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage, AIMessage
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode
from langchain_core.prompts import ChatPromptTemplate
from langchain_experimental.utilities import PythonREPL
from langgraph.prebuilt import create_react_agent

env_vars = dotenv_values('.env')
OPENAI_KEY = env_vars['OPENAI_API_KEY'] 
OPENAI_BASE_URL = env_vars['OPENAI_API_BASE'] 
SERPER_KEY = env_vars['SERPER_KEY'] 

llm = ChatOpenAI(model="gpt-4o-mini", api_key=OPENAI_KEY,base_url=OPENAI_BASE_URL)  

class AgentState(MessagesState):
    next: str

# 第一个代理
class SearchQuery(BaseModel):
    query: str = Field(description="Questions for networking queries")

@tool(args_schema = SearchQuery)
def fetch_real_time_info(query):
    """Get real-time Internet information"""
    url = "https://google.serper.dev/search"
    payload = json.dumps({
      "q": query,
      "num": 1,
    })
    headers = {
      'X-API-KEY': SERPER_KEY,
      'Content-Type': 'application/json'
    }
    
    response = requests.post(url, headers=headers, data=payload)
    data = json.loads(response.text)
    if 'organic' in data:
        return json.dumps(data['organic'],  ensure_ascii=False) 
    else:
        return json.dumps({"error": "No organic results found"},  ensure_ascii=False)  

# 第二个代理
repl = PythonREPL()
@tool
def python_repl(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    result_str = f"Successfully executed:\n\`\`\`python\n{code}\n\`\`\`\nStdout: {result}"
    return result_str


# 使用 `create_react_agent` 构建成两个`ReAct`代理。
search_agent = create_react_agent(
    llm, 
    tools=[fetch_real_time_info]
)

code_agent = create_react_agent(
    llm, 
    tools=[python_repl]
)


# 分别将两个`ReAct Agent` 构造成节点,并添加代理名称标识。
def search_node(state: AgentState):
    result = search_agent.invoke(state)
    return {
        "messages": [HumanMessage(content=result["messages"][-1].content, name="searcher")]
    }


def code_node(state: AgentState):
    result = code_agent.invoke(state)
    return {
        "messages": [HumanMessage(content=result["messages"][-1].content, name="coder")]
    }

# 直接交互的节点
def chat(state: AgentState):
    messages = state["messages"][-1]
    model_response = llm.invoke(messages.content)
    final_response = [HumanMessage(content=model_response.content, name="chat")]
    return {"messages": final_response}


# 设置代理主管可以管理的子代理
members = ["chat", "searcher", "coder"]
options = members + ["FINISH"]

# 定义路由
class Router(TypedDict):
    """Worker to route to next. If no workers needed, route to FINISH"""
    next: Literal[*options]


def supervisor(state: AgentState):
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        f" following workers: {members}.\n\n"
        "Each worker has a specific role:\n"
        "- chat: Responds directly to user inputs using natural language.\n"
        "- coder: Activated for tasks that require mathematical calculations or specific coding needs.\n"
        "- sqler: Used when database queries or explicit SQL generation is needed.\n\n"
        "Given the following user request, respond with the worker to act next."
        " Each worker will perform a task and respond with their results and status."
        " When finished, respond with FINISH."
    )
    messages = [{"role": "system", "content": system_prompt},] + state["messages"]
    response = llm.with_structured_output(Router).invoke(messages)
    next_ = response["next"]
    if next_ == "FINISH":
        next_ = END
    return {"next": next_}  


builder = StateGraph(AgentState)

builder.add_node("supervisor", supervisor)
builder.add_node("chat", chat)
builder.add_node("searcher", search_node)
builder.add_node("coder", code_node)


for member in members:
    # 每个子代理在完成工作后总是向主管“汇报”
    builder.add_edge(member, "supervisor")

builder.add_conditional_edges("supervisor", lambda state: state["next"])

builder.add_edge(START, "supervisor")

graph = builder.compile()


finan_response = graph.invoke({"messages":["What are the latest movies in 2025"]})
print(569, finan_response)  # finan_response["messages"][-1].content

如果一切顺利,那么它就可以正常工作啰!快上手试试吧。