Skip to main content
章节14 min read

第3章:Parallelization

第3章:Parallelization Parallelization 模式概述 在前几章中,我们探讨了用于顺序工作流的 Prompt Chaining 以及用于动态决策和不同路径间转换的 Routing。虽然这些模式至关重要,但许多复杂的 Agent 任务涉及多个可以同时执行而非一个接一个执行的子任务。这正是 Par...

第3章:Parallelization

Parallelization 模式概述

在前几章中,我们探讨了用于顺序工作流的 Prompt Chaining 以及用于动态决策和不同路径间转换的 Routing。虽然这些模式至关重要,但许多复杂的 Agent 任务涉及多个可以同时执行而非一个接一个执行的子任务。这正是 Parallelization 模式变得至关重要的地方。

Parallelization(并行化)涉及并发执行多个组件,例如 LLM 调用、工具使用,甚至整个子 Agent(见图1)。与其等待一个步骤完成再开始下一步,并行执行允许独立任务同时运行,显著减少了可以分解为独立部分的任务的整体执行时间。

考虑一个旨在研究某个主题并总结其发现的 Agent。顺序方法可能:

  1. 搜索来源 A。
  2. 总结来源 A。
  3. 搜索来源 B。
  4. 总结来源 B。
  5. 从总结 A 和 B 综合出最终答案。

而并行方法可以:

  1. 同时搜索来源 A 和来源 B。
  2. 一旦两个搜索都完成,同时总结来源 A 和来源 B。
  3. 从总结 A 和 B 综合出最终答案(此步骤通常是顺序的,等待并行步骤完成)。

核心思想是识别工作流中不依赖于其他部分输出的部分,并并行执行它们。这在处理具有延迟的外部服务(如 API 或数据库)时特别有效,因为你可以同时发出多个请求。

实现 parallelization 通常需要支持异步执行或多线程/多处理的框架。现代 Agent 框架的设计中考虑到了异步操作,允许你轻松定义可以并行运行的步骤。

图1:使用子 Agent 进行并行化的示例

诸如 LangChain、LangGraph 和 Google ADK 等框架提供了并行执行的机制。在 LangChain Expression Language (LCEL) 中,你可以通过使用 |(用于顺序)等操作符合并可运行对象,并通过构建具有并发执行分支的链或图来实现并行执行。LangGraph 凭借其图结构,允许你定义可以从单个状态转换执行的多个节点,有效实现在工作流中的并行分支。Google ADK 提供了健壮的、原生的机制来促进和管理 Agent 的并行执行,显著增强了复杂多 Agent 系统的效率和可扩展性。ADK 框架内这种固有的能力使开发者能够设计和实现多个 Agent 可以并发运行而非顺序运行的解决方案。

Parallelization 模式对于提高 Agent 系统的效率和响应能力至关重要,特别是在处理涉及多个独立查询、计算或与外部服务交互的任务时。它是优化复杂 Agent 工作流性能的关键技术。

实际应用与用例

Parallelization 是一个用于在各种应用中优化 Agent 性能的强大模式:

1. 信息收集与研究

从多个来源同时收集信息是一个经典用例。

  • 用例:一个研究某公司的 Agent。
    • 并行任务:同时搜索新闻文章、拉取股票数据、检查社交媒体提及并查询公司数据库。
    • 好处:比顺序查找更快地收集全面视图。

2. 数据处理与分析

并发应用不同的分析技术或处理不同的数据片段。

  • 用例:一个分析客户反馈的 Agent。
    • 并行任务:在一批反馈条目上同时运行情感分析、提取关键词、分类反馈和识别紧急问题。
    • 好处:快速提供多方面的分析。

3. 多 API 或工具交互

调用多个独立的 API 或工具来收集不同类型的信息或执行不同的操作。

  • 用例:一个旅行规划 Agent。
    • 并行任务:同时查询航班价格、搜索酒店可用性、查找当地活动和寻找餐厅推荐。
    • 好处:更快地呈现完整的旅行计划。

4. 多组件内容生成

并行生成复杂内容的不同部分。

  • 用例:一个创建营销邮件的 Agent。
    • 并行任务:同时生成主题行、起草邮件正文、查找相关图片和创建号召性用语按钮文本。
    • 好处:更高效地组装最终邮件。

5. 验证与核验

并发执行多个独立的检查或验证。

  • 用例:一个验证用户输入的 Agent。
    • 并行任务:同时检查电子邮件格式、验证电话号码、对照数据库验证地址以及检查不当内容。
    • 好处:更快地提供输入有效性的反馈。

6. 多模态处理

并发处理同一输入的不同模态(文本、图像、音频)。

  • 用例:一个分析带有文本和图片的社交媒体帖子的 Agent。
    • 并行任务:同时分析文本的情感倾向和关键词,以及分析图片中的对象和场景描述。
    • 好处:更快速地整合来自不同模态的洞察。

7. A/B 测试或多选项生成

并行生成多个响应的变体或输出以选择最佳的一个。

  • 用例:一个生成不同创意文本选项的 Agent。
    • 并行任务:使用略有不同的 prompts 或模型同时生成三个不同的文章标题。
    • 好处:允许快速比较和选择最佳选项。

Parallelization 是 Agent 设计中的一项基本优化技术,使开发者能够通过利用并发执行为独立任务来构建更高效和响应更快的应用。

实操代码示例(LangChain)

LangChain 框架内的并行执行通过 LangChain Expression Language (LCEL) 来实现。主要方法涉及在字典或列表构造中构建多个可运行(runnable)组件。当此集合作为输入传递给链中的后续组件时,LCEL 运行时会并发执行包含的可运行对象。

在 LangGraph 的语境中,此原理应用于图的拓扑结构。并行工作流通过设计图来定义,使得多个没有直接顺序依赖的节点可以从单个共同节点启动。这些并行路径独立执行,直到它们的结果可以在图中的一个后续汇聚点聚合。

以下实现演示了使用 LangChain 框架构建的并行处理工作流。此工作流旨在响应单个用户查询时并发执行两个独立操作。这些并行过程被实例化为不同的链或函数,它们的各自输出随后被聚合成一个统一的结果。

此实现的前提条件包括安装必需的 Python 包,如 langchain、langchain-community,以及模型提供商库如 langchain-openai。此外,必须在本地环境中配置所选语言模型的有效 API 密钥以进行身份验证。

python
import os import asyncio from typing import Optional from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.runnables import Runnable, RunnableParallel, RunnablePassthrough # --- Configuration --- # Ensure your API key environment variable is set (e.g., OPENAI_API_KEY) try: llm: Optional[ChatOpenAI] = ChatOpenAI(model="gpt-4o-mini", temperature=0.7) except Exception as e: print(f"Error initializing language model: {e}") llm = None # --- Define Independent Chains --- # These three chains represent distinct tasks that can be executed in parallel. summarize_chain: Runnable = ( ChatPromptTemplate.from_messages([ ("system", "Summarize the following topic concisely:"), ("user", "{topic}") ]) | llm | StrOutputParser() ) questions_chain: Runnable = ( ChatPromptTemplate.from_messages([ ("system", "Generate three interesting questions about the following topic:"), ("user", "{topic}") ]) | llm | StrOutputParser() ) terms_chain: Runnable = ( ChatPromptTemplate.from_messages([ ("system", "Identify 5-10 key terms from the following topic, separated by commas:"), ("user", "{topic}") ]) | llm | StrOutputParser() ) # --- Build the Parallel + Synthesis Chain --- # 1. Define the block of tasks to run in parallel. The results of these, # along with the original topic, will be fed into the next step. map_chain = RunnableParallel( { "summary": summarize_chain, "questions": questions_chain, "key_terms": terms_chain, "topic": RunnablePassthrough(), # Pass the original topic through } ) # 2. Define the final synthesis prompt which will combine the parallel results. synthesis_prompt = ChatPromptTemplate.from_messages([ ("system", """Based on the following information: Summary: {summary} Related Questions: {questions} Key Terms: {key_terms} Synthesize a comprehensive answer."""), ("user", "Original topic: {topic}") ]) # 3. Construct the full chain by piping the parallel results directly # into the synthesis prompt, followed by the LLM and output parser. full_parallel_chain = map_chain | synthesis_prompt | llm | StrOutputParser() # --- Run the Chain --- async def run_parallel_example(topic: str) -> None: """ Asynchronously invokes the parallel processing chain with a specific topic and prints the synthesized result. Args: topic: The input topic to be processed by the LangChain chains. """ if not llm: print("LLM not initialized. Cannot run example.") return print(f"\n--- Running Parallel LangChain Example for Topic: '{topic}' ---") try: # The input to `ainvoke` is the single 'topic' string, # then passed to each runnable in the `map_chain`. response = await full_parallel_chain.ainvoke(topic) print("\n--- Final Response ---") print(response) except Exception as e: print(f"\nAn error occurred during chain execution: {e}") if __name__ == "__main__": test_topic = "The history of space exploration" # In Python 3.7+, asyncio.run is the standard way to run an async function. asyncio.run(run_parallel_example(test_topic))

提供的 Python 代码实现了一个 LangChain 应用,旨在通过利用并行执行高效处理给定主题。请注意,asyncio 提供的是并发(concurrency),而非并行(parallelism)。它通过在单线程上使用事件循环来实现这一点,该事件循环在一个任务空闲时(例如等待网络请求)智能地在任务之间切换。这创造了多个任务同时推进的效果,但代码本身仍然只由一个线程执行,受 Python 的全局解释器锁(GIL)限制。

代码首先从 langchain_openai 和 langchain_core 导入基本模块,包括语言模型、prompts、输出解析和可运行结构的组件。代码尝试初始化一个 ChatOpenAI 实例,具体使用 "gpt-4o-mini" 模型,并设置了控制创造性的指定温度。try-except 块用于语言模型初始化期间的健壮性。然后定义了三个独立的 LangChain "链"(chains),每个设计用于对输入主题执行不同的任务。第一个链用于简洁地总结主题,使用包含主题占位符的系统消息和用户消息。第二个链配置为生成与主题相关的三个有趣问题。第三个链设置为从输入主题中识别 5 到 10 个关键术语,要求它们以逗号分隔。每个独立链由针对其特定任务定制的 ChatPromptTemplate、初始化的语言模型以及一个将其输出格式化为字符串的 StrOutputParser 组成。

然后构建一个 RunnableParallel 块来捆绑这三个链,允许它们同时执行。这个并行可运行对象还包括一个 RunnablePassthrough,以确保原始输入主题在后续步骤中可用。为最终的综合步骤定义了一个单独的 ChatPromptTemplate,将摘要、问题、关键术语和原始主题作为输入来生成全面的答案。完整的端到端处理链(命名为 full_parallel_chain)通过将 map_chain(并行块)序列化到综合提示,然后是语言模型和输出解析器来创建。提供了一个异步函数 run_parallel_example 来演示如何调用此 full_parallel_chain。此函数将主题作为输入,并使用 ainvoke 运行异步链。最后,标准的 Python if name == "main": 块展示了如何使用示例主题(本例中为 "The history of space exploration")执行 run_parallel_example,使用 asyncio.run 管理异步执行。

本质上,此代码设置了一个工作流,其中对给定主题的多个 LLM 调用(用于摘要、问题和术语)同时发生,然后它们的结果由最终 LLM 调用进行组合。这展示了在 Agent 工作流中使用 LangChain 实现 parallelization 的核心思想。

实操代码示例(Google ADK)

现在让我们转向一个具体示例,说明这些概念在 Google ADK 框架中的应用。我们将探讨如何使用 ADK 原语(如 ParallelAgent 和 SequentialAgent)来构建一个利用并发执行以提高效率的 Agent 流。

python
from google.adk.agents import LlmAgent, ParallelAgent, SequentialAgent from google.adk.tools import google_search GEMINI_MODEL = "gemini-2.0-flash" # --- 1. Define Researcher Sub-Agents (to run in parallel) --- # Researcher 1: Renewable Energy researcher_agent_1 = LlmAgent( name="RenewableEnergyResearcher", model=GEMINI_MODEL, instruction="""You are an AI Research Assistant specializing in energy. Research the latest advancements in 'renewable energy sources'. Use the Google Search tool provided. Summarize your key findings concisely (1-2 sentences). Output *only* the summary. """, description="Researches renewable energy sources.", tools=[google_search], # Store result in state for the merger agent output_key="renewable_energy_result" ) # Researcher 2: Electric Vehicles researcher_agent_2 = LlmAgent( name="EVResearcher", model=GEMINI_MODEL, instruction="""You are an AI Research Assistant specializing in transportation. Research the latest developments in 'electric vehicle technology'. Use the Google Search tool provided. Summarize your key findings concisely (1-2 sentences). Output *only* the summary. """, description="Researches electric vehicle technology.", tools=[google_search], # Store result in state for the merger agent output_key="ev_technology_result" ) # Researcher 3: Carbon Capture researcher_agent_3 = LlmAgent( name="CarbonCaptureResearcher", model=GEMINI_MODEL, instruction="""You are an AI Research Assistant specializing in climate solutions. Research the current state of 'carbon capture methods'. Use the Google Search tool provided. Summarize your key findings concisely (1-2 sentences). Output *only* the summary. """, description="Researches carbon capture methods.", tools=[google_search], # Store result in state for the merger agent output_key="carbon_capture_result" ) # --- 2. Create the ParallelAgent (Runs researchers concurrently) --- # This agent orchestrates the concurrent execution of the researchers. # It finishes once all researchers have completed and stored their results in state. parallel_research_agent = ParallelAgent( name="ParallelWebResearchAgent", sub_agents=[researcher_agent_1, researcher_agent_2, researcher_agent_3], description="Runs multiple research agents in parallel to gather information." ) # --- 3. Define the Merger Agent (Runs *after* the parallel agents) --- # This agent takes the results stored in the session state by the parallel agents # and synthesizes them into a single, structured response with attributions. merger_agent = LlmAgent( name="SynthesisAgent", model=GEMINI_MODEL, # Or potentially a more powerful model if needed for synthesis instruction="""You are an AI Assistant responsible for combining research findings into a structured report. Your primary task is to synthesize the following research summaries, clearly attributing findings to their source areas. Structure your response using headings for each topic. Ensure the report is coherent and integrates the key points smoothly. **Crucially: Your entire response MUST be grounded *exclusively* on the information provided in the 'Input Summaries' below. Do NOT add any external knowledge, facts, or details not present in these specific summaries.** **Input Summaries:** * **Renewable Energy:** {renewable_energy_result} * **Electric Vehicles:** {ev_technology_result} * **Carbon Capture:** {carbon_capture_result} **Output Format:** ## Summary of Recent Sustainable Technology Advancements ### Renewable Energy Findings (Based on RenewableEnergyResearcher's findings) [Synthesize and elaborate *only* on the renewable energy input summary provided above.] ### Electric Vehicle Findings (Based on EVResearcher's findings) [Synthesize and elaborate *only* on the EV input summary provided above.] ### Carbon Capture Findings (Based on CarbonCaptureResearcher's findings) [Synthesize and elaborate *only* on the carbon capture input summary provided above.] ### Overall Conclusion [Provide a brief (1-2 sentence) concluding statement that connects *only* the findings presented above.] Output *only* the structured report following this format. Do not include introductory or concluding phrases outside this structure, and strictly adhere to using only the provided input summary content. """, description="Combines research findings from parallel agents into a structured, cited report, strictly grounded on provided inputs.", # No tools needed for merging # No output_key needed here, as its direct response is the final output of the sequence ) # --- 4. Create the SequentialAgent (Orchestrates the overall flow) --- # This is the main agent that will be run. It first executes the ParallelAgent # to populate the state, and then executes the MergerAgent to produce the final output. sequential_pipeline_agent = SequentialAgent( name="ResearchAndSynthesisPipeline", # Run parallel research first, then merge sub_agents=[parallel_research_agent, merger_agent], description="Coordinates parallel research and synthesizes the results." ) root_agent = sequential_pipeline_agent

此代码定义了一个多 Agent 系统,用于研究和综合可持续技术进步的信息。它设置了三个 LlmAgent 实例作为专门的研究员。ResearcherAgent_1 专注于可再生能源,ResearcherAgent_2 研究电动汽车技术,ResearcherAgent_3 调查碳捕获方法。每个研究员 Agent 被配置为使用 GEMINI_MODEL 和 google_search 工具。它们被指示简洁地总结其发现(1-2 句话)并使用 output_key 将这些摘要存储在会话状态中。

然后创建一个名为 ParallelWebResearchAgent 的 ParallelAgent 来并发运行这三个研究员 Agent。这使得研究可以并行进行,可能节省时间。ParallelAgent 在其所有子 Agent(研究员)完成并填充状态后完成执行。

接下来,定义一个 MergerAgent(也是一个 LlmAgent)来综合研究结果。此 Agent 将并行研究员存储在会话状态中的摘要作为输入。其指令强调,输出必须严格仅基于提供的输入摘要,禁止添加外部知识。MergerAgent 被设计为将组合发现结构化为报告,包含每个主题的标题和一个简短的整体结论。

最后,创建一个名为 ResearchAndSynthesisPipeline 的 SequentialAgent 来编排整个工作流。作为主要控制器,此主 Agent 首先执行 ParallelAgent 来执行研究。一旦 ParallelAgent 完成,SequentialAgent 然后执行 MergerAgent 来综合收集的信息。sequential_pipeline_agent 被设置为 root_agent,代表运行此多 Agent 系统的入口点。整个过程旨在高效地从多个来源并行收集信息,然后将其组合成单个结构化报告。

概览

是什么:许多 Agent 工作流涉及必须完成以实现最终目标的多个子任务。纯顺序执行(每个任务等待前一个任务完成)通常效率低下且缓慢。当任务依赖于外部 I/O 操作(如调用不同的 API 或查询多个数据库)时,这种延迟成为重大瓶颈。没有并发执行机制,总处理时间是所有单个任务持续时间之和,阻碍了系统的整体性能和响应能力。

为什么:Parallelization 模式通过启用独立任务的同时执行,提供了一个标准化解决方案。它通过识别工作流中不依赖彼此即时输出的组件(如工具使用或 LLM 调用)来工作。诸如 LangChain 和 Google ADK 等 Agent 框架提供了内置构造来定义和管理这些并发操作。例如,主流程可以调用几个并行运行的子任务,并等待所有任务完成后再继续下一步。通过同时运行这些独立任务而非一个接一个,此模式极大地减少了总执行时间。

经验法则:当工作流包含多个可以同时运行的独立操作时使用此模式,例如从多个 API 获取数据、处理不同的数据块,或为后续综合生成多个内容片段。

可视化总结

图2:Parallelization 设计模式

关键要点

以下是关键要点:

  • Parallelization 是一种通过并发执行独立任务来提高效率的模式。
  • 当任务涉及等待外部资源(如 API 调用)时特别有用。
  • 采用并发或并行架构会引入显著的复杂性和成本,影响设计、调试和系统日志记录等关键开发阶段。
  • 诸如 LangChain 和 Google ADK 等框架提供了定义和管理并行执行的内置支持。
  • 在 LangChain Expression Language (LCEL) 中,RunnableParallel 是并行运行多个可运行对象的关键构造。
  • Google ADK 可以通过 LLM 驱动的委派来促进并行执行,其中协调器 Agent 的 LLM 识别独立的子任务,并触发专门的子 Agent 并发处理它们。
  • Parallelization 有助于减少整体延迟,使 Agent 系统对复杂任务更具响应性。

结论

Parallelization 模式是一种通过并发执行独立子任务来优化计算工作流的方法。这种方法减少了整体延迟,特别是在涉及多个模型推理或对外部服务调用的复杂操作中。

框架为此模式的实现提供了不同的机制。在 LangChain 中,像 RunnableParallel 这样的构造用于显式定义和同时执行多个处理链。相比之下,像 Google Agent Developer Kit (ADK) 这样的框架可以通过多 Agent 委派实现并行化,其中主协调器模型将不同的子任务分配给可以并发运行的专门 Agent。

通过将并行处理与顺序(chaining)和条件(routing)控制流相结合,可以构建能够高效管理多样且复杂任务的复杂高性能计算系统。

参考文献

以下是一些关于 Parallelization 模式及相关概念的进一步阅读资源:

  1. LangChain Expression Language (LCEL) Documentation (Parallelism): https://python.langchain.com/docs/concepts/lcel/
  2. Google Agent Developer Kit (ADK) Documentation (Multi-Agent Systems): https://google.github.io/adk-docs/agents/multi-agents/
  3. Python asyncio Documentation: https://docs.python.org/3/library/asyncio.html