主题
5.2 重试与容错模式
在上一节中我们讨论的自省循环主要关注"质量改进"——通过多轮迭代让输出变得更好。但循环结构的另一个重要用途是"错误恢复"——当某个操作因为临时性故障(网络超时、API 限流、服务不可用等)而失败时,通过自动重试来从故障中恢复。这一节我们会深入探讨 LangGraph 中各种重试与容错模式的实现方法,包括简单重试、指数退避重试、断路器模式以及级联失败的处理策略。
简单重试:固定次数的重试机制
最基础的重试模式是:操作失败后立即重试,最多重试 N 次,全部失败后走降级或报错路径。这种模式实现简单,适用于那些失败概率较低且重试成本不高的场景。
python
from typing import TypedDict, Annotated
import operator
import random
import time
from langgraph.graph import StateGraph, START, END
class RetryState(TypedDict):
target_url: str
payload: dict
response_data: dict
error_message: str
attempt: Annotated[int, operator.add]
max_attempts: int
success: bool
final_status: str
retry_log: Annotated[list[str], operator.add]
def simulate_api_call(url: str, payload: dict) -> tuple[dict | None, str | None]:
"""模拟一个有概率失败的 API 调用"""
random.seed(hash(url + str(payload.get("id", ""))) % 10000)
rand = random.random()
if rand < 0.25:
return None, "503 Service Unavailable: 服务暂时过载"
if rand < 0.35:
return None, "429 Too Many Requests: 请求频率超限"
if rand < 0.40:
return None, "ConnectionError: 连接超时"
return {
"status": "success",
"data": f"来自 {url} 的响应数据",
"request_id": f"req-{random.randint(10000, 99999)}",
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
}, None
def call_api(state: RetryState) -> dict:
url = state["target_url"]
payload = state["payload"]
attempt_num = state["attempt"] + 1
response, error = simulate_api_call(url, payload)
log_prefix = f"[尝试 {attempt_num}/{state['max_attempts']}]"
if error:
return {
"error_message": error,
"success": False,
"attempt": 1,
"retry_log": [f"{log_prefix} ❌ 失败: {error}"]
}
return {
"response_data": response,
"error_message": "",
"success": True,
"attempt": 1,
"retry_log": [f"{log_prefix} ✅ 成功 | ID: {response['request_id']}"]
}
def should_retry(state: RetryState) -> str:
if state["success"]:
return "success"
if state["attempt"] >= state["max_attempts"]:
return "exhausted"
return "retry"
def handle_success(state: RetryState) -> dict:
attempts = state["attempt"]
data = state["response_data"]
return {
"final_status": f"✅ 成功 (第{attempts}次尝试)",
"retry_log": [f"[完成] 数据已获取: {data['data'][:50]}"]
}
def handle_exhaustion(state: RetryState) -> dict:
attempts = state["attempt"]
last_error = state["error_message"]
return {
"final_status": f"❌ 失败 ({attempts}次尝试均未成功)",
"retry_log": [
f"[放弃] 最后错误: {last_error}",
"[建议] 请稍后重试或联系技术支持"
]
}
retry_graph = StateGraph(RetryState)
retry_graph.add_node("call_api", call_api)
retry_graph.add_node("handle_success", handle_success)
retry_graph.add_node("handle_exhaustion", handle_exhaustion)
retry_graph.add_edge(START, "call_api")
retry_graph.add_conditional_edges("call_api", should_retry, {
"retry": "call_api",
"success": "handle_success",
"exhausted": "handle_exhaustion"
})
retry_graph.add_edge("handle_success", END)
retry_graph.add_edge("handle_exhaustion", END)
app = retry_graph.compile()
print("=" * 60)
print("简单重试模式演示")
print("=" * 60)
result = app.invoke({
"target_url": "https://api.example.com/v2/data",
"payload": {"id": "test-001", "action": "query"},
"response_data": {},
"error_message": "",
"attempt": 0,
"max_attempts": 5,
"success": False,
"final_status": "",
"retry_log": []
})
print(f"\n最终状态: {result['final_status']}")
for entry in result["retry_log"]:
print(f" {entry}")这段程序描述了简单重试的工作原理。call_api 节点模拟了一个可能失败的 API 调用(25% 概率 503、10% 概率 429、5% 概率超时),每次调用后 should_retry 条件函数检查是否成功、是否达到最大尝试次数,然后决定下一步是再试一次(回到 call_api)、成功结束还是失败退出。由于 attempt 字段使用了 Annotated[int, operator.add],每经过一次 call_api 节点计数器自动加 1。
指数退避重试:避免雪崩效应
简单重试有一个问题:如果下游服务已经因为负载过高而开始返回错误,连续快速的重试只会让它更加不堪重负——这就是所谓的"重试风暴"或"雪崩效应"。指数退避(Exponential Backoff)策略通过在每次重试之间增加等待时间来缓解这个问题:第一次重试等 1 秒,第二次等 2 秒,第三次等 4 秒,以此类推。这样既给了下游服务恢复的时间,也避免了短时间内大量重试请求的冲击。
python
import time
import asyncio
class BackoffRetryState(TypedDict):
api_endpoint: str
request_params: dict
response: dict
last_error: str
attempt: Annotated[int, operator.add]
max_attempts: int
base_delay: float
max_delay: float
success: bool
status: str
log: Annotated[list[str], operator.add]
def call_with_backoff(state: BackoffRetryState) -> dict:
endpoint = state["api_endpoint"]
params = state["request_params"]
attempt = state["attempt"] + 1
base_delay = state["base_delay"]
max_delay = state["max_delay"]
if attempt > 1:
delay = min(base_delay * (2 ** (attempt - 2)), max_delay)
time.sleep(delay)
response, error = simulate_api_call(endpoint, params)
log_parts = [f"[第{attempt}次] 调用 {endpoint}"]
if attempt > 1:
delay_actual = min(base_delay * (2 ** (attempt - 2)), max_delay)
log_parts.append(f"(退避 {delay_actual:.1f}s)")
if error:
log_parts.append(f"→ ❌ {error}")
return {
"last_error": error,
"success": False,
"attempt": 1,
"log": [" ".join(log_parts)]
}
log_parts.append(f"→ ✅ 成功")
return {
"response": response,
"last_error": "",
"success": True,
"attempt": 1,
"log": [" ".join(log_parts)]
}
backoff_graph = StateGraph(BackoffRetryState)
backoff_graph.add_node("call_with_backoff", call_with_backoff)
backoff_graph.add_node("success_handler", lambda s: {
"status": f"✅ 完成 (第{s['attempt']}次)", "log": ["[完成]"]
})
backoff_graph.add_node("fail_handler", lambda s: {
"status": f"❌ 放弃 (已尝试{s['attempt']}次)",
"log": [f"[放弃] 错误: {s['last_error']}"]
})
backoff_graph.add_edge(START, "call_with_backoff")
backoff_graph.add_conditional_edges("call_with_backoff",
lambda s: "success" if s["success"] else ("exhausted" if s["attempt"] >= s["max_attempts"] else "retry"),
{"retry": "call_with_backoff", "success": "success_handler", "exhausted": "fail_handler"}
)
backoff_graph.add_edge("success_handler", END)
backoff_graph.add_edge("fail_handler", END)
app = backoff_graph.compile()
start = time.time()
result = app.invoke({
"api_endpoint": "https://api.heavy-load.example.com/data",
"request_params": {"limit": 100},
"response": {}, "last_error": "", "attempt": 0,
"max_attempts": 6, "base_delay": 0.5, "max_delay": 8.0,
"success": False, "status": "", "log": []
})
elapsed = time.time() - start
print(f"\n最终状态: {result['status']}")
print(f"总耗时: {elapsed:.2f}s")
for entry in result["log"]:
print(f" {entry}")注意 call_with_backoff 函数中的关键逻辑:只有当 attempt > 1 时才计算并执行退避等待(第一次调用不需要等待)。退避时间使用公式 min(base_delay * 2^(attempt-2), max_delay) 计算——这意味着第二次尝试前等 0.5s(即 0.5 * 2^0)、第三次前等 1s(0.5 * 2^1)、第四次前等 2s、第五次前等 4s、第六次前等 8s(被 max_delay=8.0 封顶)。这种指数增长的等待时间既能给下游足够的恢复空间,又不会让用户等太久(因为有 max_delay 上限)。
断路器模式:防止级联失败
比重试更高级的容错模式是断路器(Circuit Breaker)模式。它的核心思想是:如果某个服务的失败频率超过了阈值,就暂时"熔断"对它的所有调用,直接返回错误或降级结果,而不是继续重试。经过一段冷却期后再尝试恢复调用,如果成功了就关闭断路器恢复正常,如果失败了就继续保持熔断状态。这就像电路中的保险丝——当电流过大时自动跳闸保护整个电路不被烧毁。
python
from typing import TypedDict, Annotated
import operator
import time
from langgraph.graph import StateGraph, START, END
class CircuitBreakerState(TypedDict):
service_name: str
request_payload: dict
failure_count: int
success_count: int
threshold: int
cooldown_seconds: float
circuit_state: str
last_failure_time: float
response_data: dict
fallback_data: dict
final_result: str
log: Annotated[list[str], operator.add]
def check_circuit(state: CircuitBreakerState) -> str:
circuit = state["circuit_state"]
now = time.time()
if circuit == "closed":
return "try_call"
elif circuit == "open":
elapsed = now - state["last_failure_time"]
if elapsed >= state["cooldown_seconds"]:
return "half_open"
return "use_fallback"
else:
return "try_call"
def make_call(state: CircuitBreakerState) -> dict:
service = state["service_name"]
response, error = simulate_api_call(service, state["request_payload"])
if error:
new_failures = state["failure_count"] + 1
threshold = state["threshold"]
if new_failures >= threshold:
new_circuit = "open"
log_msg = (
f"[调用] ❌ {service} 失败\n"
f" 失败数: {new_failures}/{threshold}\n"
f" → 断路器打开! 冷却 {state['cooldown_seconds']}s"
)
else:
new_circuit = "closed"
log_msg = (
f"[调用] ❌ {service} 失败\n"
f" 失败数: {new_failures}/{threshold}"
)
return {
"failure_count": new_failures,
"circuit_state": new_circuit,
"last_failure_time": time.time(),
"log": [log_msg]
}
new_successes = state["success_count"] + 1
return {
"response_data": response,
"success_count": new_successes,
"failure_count": 0,
"circuit_state": "closed",
"log": [f"[调用] ✅ {service} 成功 (连续{new_successes}次成功)"]
}
def provide_fallback(state: CircuitBreakerState) -> dict:
service = state["service_name"]
fallback = {
"status": "fallback",
"source": "cached/degraded_response",
"message": f"{service} 当前不可用,返回降级数据",
"timestamp": time.strftime("%H:%M:%S"),
"data": "这是降级后的默认数据"
}
return {
"fallback_data": fallback,
"final_result": f"⚠️ 使用降级响应 ({service} 断路中)",
"log": [f"[降级] 返回缓存/降级数据 (原因: {service} 不可用)"]
}
def probe_service(state: CircuitBreakerState) -> dict:
service = state["service_name"]
response, error = simulate_api_call(service, state["request_payload"])
if error:
return {
"circuit_state": "open",
"last_failure_time": time.time(),
"log": [f"[探测] ❌ {service} 仍未恢复,保持断路"]
}
return {
"response_data": response,
"circuit_state": "closed",
"failure_count": 0,
"log": [f"[探测] ✅ {service} 已恢复! 断路器关闭"]
}
def finalize_success(state: CircuitBreakerState) -> dict:
return {
"final_result": f"✅ 调用成功 ({state['service_name']})",
"log": ["[完成] 正常响应"]
}
cb_graph = StateGraph(CircuitBreakerState)
cb_graph.add_node("check_circuit", lambda s: {})
cb_graph.add_node("make_call", make_call)
cb_graph.add_node("fallback", provide_fallback)
cb_graph.add_node("probe", probe_service)
cb_graph.add_node("success", finalize_success)
cb_graph.add_edge(START, "check_circuit")
cb_graph.add_conditional_edges("check_circuit", check_circuit, {
"try_call": "make_call",
"half_open": "probe",
"use_fallback": "fallback"
})
cb_graph.add_conditional_edges("make_call",
lambda s: "success" if s.get("response_data") else "check_circuit",
{"success": "success", "check_circuit": "check_circuit"}
)
cb_graph.add_conditional_edges("probe",
lambda s: "success" if s.get("response_data") else "fallback",
{"success": "success", "fallback": "fallback"}
)
cb_graph.add_edge("fallback", END)
cb_graph.add_edge("success", END)
app = cb_graph.compile()
result = app.invoke({
"service_name": "payment-service",
"request_payload": {"action": "charge", "amount": 99.9},
"failure_count": 0, "success_count": 0,
"threshold": 3, "cooldown_seconds": 5.0,
"circuit_state": "closed",
"last_failure_time": 0,
"response_data": {}, "fallback_data": {},
"final_result": "", "log": []
})
print(f"\n最终结果: {result['final_result']}")
for entry in result["log"]:
print(entry)这个断路器实现展示了三种状态的转换:
- Closed(闭合):正常状态,允许所有调用通过。连续失败次数达到阈值时转换为 Open。
- Open(打开):熔断状态,所有调用直接走降级路径,不再尝试调用实际服务。冷却期结束后转换为 Half-Open 进行探测。
- Half-Open(半开):探测状态,允许一个探测调用通过以检测服务是否恢复。如果探测成功则转为 Closed,失败则重新转为 Open 并重新计时冷却期。
三种状态之间的转换关系可以用下面的状态图表示:
连续失败 ≥ threshold
[Closed] ──────────────────► [Open]
▲ │
│ │ 冷却期结束
│ 探测成功 ▼
└────────────────── [Half-Open]
│
探测失败
│
▼
[Open]多依赖场景下的容错设计
在实际系统中,你的图往往需要调用多个外部服务,每个服务都可能失败。这时候就需要考虑更复杂的容错策略——比如某个非关键服务失败了要不要影响整体流程?多个服务都失败了怎么处理?如何区分关键依赖和非关键依赖?
python
class MultiServiceState(TypedDict):
primary_service_resp: dict
secondary_service_resp: dict
cache_service_resp: dict
primary_ok: bool
secondary_ok: bool
cache_ok: bool
combined_result: dict
status: str
log: Annotated[list[str], operator.add]
def call_primary(state: MultiServiceState) -> dict:
resp, err = simulate_api_call("primary-api", {"type": "main"})
ok = resp is not None
return {
"primary_service_resp": resp or {},
"primary_ok": ok,
"log": [f"[主服务] {'✅' if ok else '❌'} {'成功' if ok else err}"]
}
def call_secondary(state: MultiServiceState) -> dict:
resp, err = simulate_api_call("secondary-api", {"type": "aux"})
ok = resp is not None
return {
"secondary_service_resp": resp or {},
"secondary_ok": ok,
"log": [f"[辅助服务] {'✅' if ok else '❌'} {'成功' if ok else err}"]
}
def call_cache(state: MultiServiceState) -> dict:
resp, err = simulate_api_call("cache-service", {"type": "lookup"})
ok = resp is not None
return {
"cache_service_resp": resp or {},
"cache_ok": ok,
"log": [f"[缓存服务] {'✅' if ok else '❌'} {'成功' if ok else err}"]
}
def merge_results(state: MultiServiceState) -> dict:
p_ok = state["primary_ok"]
s_ok = state["secondary_ok"]
c_ok = state["cache_ok"]
combined = {}
status_msgs = []
if p_ok:
combined.update(state["primary_service_resp"])
status_msgs.append("主服务数据已整合")
else:
status_msgs.append("⚠️ 主服务不可用")
if s_ok:
combined["auxiliary"] = state["secondary_service_resp"]
status_msgs.append("辅助服务数据已补充")
else:
status_msgs.append("ℹ️ 辅助服务缺失(非关键)")
if c_ok and not p_ok:
combined["from_cache"] = state["cache_service_resp"]
status_msgs.append("使用缓存数据作为替代")
if p_ok or c_ok:
final_status = "✅ 部分或完全成功"
else:
final_status = "❌ 所有数据源均不可用"
return {
"combined_result": combined,
"status": final_status,
"log": [f"[汇总] {'; '.join(status_msgs)}"]
}
ms_graph = StateGraph(MultiServiceState)
ms_graph.add_node("primary", call_primary)
ms_graph.add_node("secondary", call_secondary)
ms_graph.add_node("cache", call_cache)
ms_graph.add_node("merge", merge_results)
ms_graph.add_edge(START, "primary")
ms_graph.add_edge(START, "secondary") # 并行调用
ms_graph.add_edge(START, "cache")
ms_graph.add_edge("primary", "merge")
ms_graph.add_edge("secondary", "merge")
ms_graph.add_edge("cache", "merge")
ms_graph.add_edge("merge", END)
app = ms_graph.compile()
result = app.invoke({
"primary_service_resp": {}, "secondary_service_resp": {}, "cache_service_resp": {},
"primary_ok": False, "secondary_ok": False, "cache_ok": False,
"combined_result": {}, "status": "", "log": []
})
print(f"\n{result['status']}")
for entry in result["log"]:
print(f" {entry}")这个多服务调用的例子展示了几种不同的容错策略组合:主服务(primary)是关键依赖,它的失败会显著影响结果质量;辅助服务(secondary)是非关键的,失败了只是缺少一些补充信息但不影响核心功能;缓存服务(cache)作为后备方案——只有在主服务失败时才会使用缓存数据。merge_results 节点根据各服务的可用情况智能地合并数据,尽可能多地利用可用的信息源来构建完整的结果。
容错设计的最佳实践总结
在 LangGraph 中设计和实现容错机制时,有几个经验法则值得遵循:
第一,始终设置最大重试次数和最大超时时间。没有任何重试应该是无限次的,也没有任何等待应该是无限长的。这两个硬性约束能确保即使出现异常情况,图的执行也会在有限的时间内终止。
第二,区分临时性错误和永久性错误。对于临时性错误(网络超时、503 服务不可用、429 限流),重试是有意义的;但对于永久性错误(401 认证失败、404 资源不存在、400 参数错误),重试是没有意义的,应该立即失败并走错误处理路径。在路由函数中加入错误类型的判断可以避免无意义的重试。
第三,为每个外部依赖定义 SLA(服务水平协议)。明确每个外部服务的超时时间、重试策略、降级方案,并在图中体现这些约定。不要让图的容错行为依赖于隐式的假设。
第四,记录详细的故障信息用于事后分析。每次失败都应该记录完整的上下文——什么时间、调用了哪个服务、传了什么参数、收到了什么错误、当时的状态是什么。这些信息对于排查生产问题和持续优化容错策略至关重要。
第五,在开发阶段故意注入故障来测试容错机制。不要等到真正出问题了才发现容错代码有 bug。可以在测试环境中模拟各种故障场景(服务宕机、网络延迟、返回异常格式等),验证你的容错机制能否正确处理每种情况。