类型安全事件系统(面向开发者与 AI 约束)
设计目标
事件系统不是"字符串 + dict"的通知器,而是核心契约层。目标:
- 类型安全:事件类型、载荷结构、处理器签名可被 pyright 严格检查。
- 行为可预测:发布语义、错误策略、并发模型固定。
- 可注入:处理器依赖通过统一 DI 注入,避免隐式全局状态。
- 可治理:对开发者与 AI 都有明确边界,减少"随手加事件名"和"任意 payload"。
Python 是否有统一规范
Python 对"进程内事件总线"没有单一官方规范(标准库未给出统一 EventBus 抽象)。实践上通常是:
- 用
asyncio自研轻量总线并固定项目契约。 - 或选用轻量库(如 blinker/pyee)再叠加类型和并发约束。
对 nahida-bot,推荐第一条:自研轻量总线 + 强类型事件模型。
参考 Rust/C++ 项目的可迁移思路
可借鉴的"机制"而不是语法:
- Rust Tokio:
broadcast/mpsc的边界清晰,强调背压和关闭语义。 - Rust Bevy:事件是显式类型,系统按类型消费事件。
- C++ Boost.Signals2:订阅关系可管理、连接可断开、生命周期安全。
映射到 Python:
- 借鉴 Tokio:引入
max_queue_size、publish超时、shutdown明确语义。 - 借鉴 Bevy:事件用独立类建模,按事件类订阅,不用裸字符串驱动主逻辑。
- 借鉴 Signals2:
subscribe返回可取消句柄,避免遗忘反注册导致泄漏。
核心类型契约(建议固定)
建议目录:
text
nahida_bot/core/events/
__init__.py
bus.py # EventBus 实现
types.py # 事件基类与类型定义
registry.py # 允许事件类型白名单(可选)
errors.py # 事件系统错误定义建议类型模型:
python
from dataclasses import dataclass, field
from datetime import datetime, UTC
from typing import Generic, TypeVar
from uuid import UUID, uuid4
PayloadT = TypeVar("PayloadT")
@dataclass(slots=True, frozen=True)
class Event(Generic[PayloadT]):
event_id: UUID = field(default_factory=uuid4)
trace_id: str = ""
source: str = ""
occurred_at: datetime = field(default_factory=lambda: datetime.now(UTC))
payload: PayloadT = field()业务事件建议用独立类型,不建议用 dict[str, Any]:
python
@dataclass(slots=True, frozen=True)
class AppStartedPayload:
app_name: str
debug: bool
@dataclass(slots=True, frozen=True)
class AppStarted(Event[AppStartedPayload]):
...处理器签名建议:
python
from typing import Protocol, TypeVar, Awaitable
EventT = TypeVar("EventT", bound=Event[object])
class EventHandler(Protocol[EventT]):
async def __call__(self, event: EventT) -> None: ...EventBus 语义(建议作为硬约束)
事件系统采用双层 API:
- Core API(底层执行层,稳定、可预测、低魔法)
subscribe(event_type, handler) -> Subscriptionunsubscribe(event_type, handler) -> Nonepublish(event) -> PublishResultpublish_nowait(event) -> boolshutdown(timeout: float | None = None) -> None
- Facade API(上层开发体验层,Pythonic、声明式)
@events.on(EventType)装饰器式注册。@events.on(EventType, depends=[Depends(check_xxx)])前置依赖。events.emit(EventType(...))语义别名(内部仍委派到publish)。events.listener(EventType)上下文管理器(自动注册/解绑)。
语义约束:
- 订阅按
event_type(类)匹配,不按字符串名匹配核心逻辑。 - 默认策略:同一事件内按订阅顺序串行执行,事件与事件之间可并发。
- 单个 handler 失败不阻断同事件其他 handler;失败收敛到
PublishResult.errors。 publish可配置超时;超时和取消需要可观测(日志 + 指标)。shutdown后拒绝新事件,并等待在途处理完成。
与 FastAPI/NoneBot2 风格 DI 的结合
可借鉴 FastAPI 的思路:
- handler 不直接读取全局单例,而是显式声明依赖。
- 依赖由容器/解析器提供,便于测试替身注入。
可借鉴 NoneBot2 的思路:
- 将处理器参数视作"可解析依赖",由框架在调用前构建。
- 把上下文对象(session、bot、state)作为受控注入项,而非任意获取。
- 依赖求值采用 parse/check/solve 三段式,而非直接反射调用。
建议在事件系统中引入 EventContext:
python
@dataclass(slots=True)
class EventContext:
settings: Settings
logger: logging.Logger
app: Application处理器签名建议统一为:
python
async def handle_app_started(event: AppStarted, ctx: EventContext) -> None:
...由 EventBus 在 dispatch 前注入 ctx,避免 handler 到处访问全局对象。
建议补充 Depends 风格语义(用于 Facade API):
python
from typing import Annotated
async def get_repo(ctx: EventContext) -> Repo: ...
async def guard_feature(ctx: EventContext) -> None: ...
@events.on(AppStarted, depends=[Depends(guard_feature)])
async def bootstrap(
event: AppStarted,
repo: Annotated[Repo, Depends(get_repo)],
) -> None:
...依赖解析流程建议固定为:
- parse:解析函数签名和
Depends元信息。 - check:执行 parameterless/pre-check 依赖(返回值忽略)。
- solve:解析事件参数与子依赖值(支持缓存和校验)。
- call:调用 handler。
- teardown:清理生成器依赖/上下文资源。
约束开发者与 AI 的规则
为减少错误扩展,建议把以下规则写入实现注释与测试:
- 禁止在核心事件总线里传裸
dictpayload(仅测试夹具可例外)。 - 新事件必须定义 payload 类型和事件类,并添加至少一条类型检查测试。
- 新 handler 必须显式声明
event和ctx参数,不允许*args/**kwargs。 - 事件类型命名固定在
core.events.types,禁止分散到任意模块。 - 事件处理副作用需可测试,不允许只写日志不暴露可断言行为。
渐进实施计划
Phase 1 建议拆成三步:
- 建立
core/events目录与类型契约(Event、EventHandler、Subscription)。 - 在
Application.initialize/start/stop中接入四个生命周期事件:AppInitializingAppStartedAppStoppingAppStopped
- 补齐测试:类型检查、订阅行为、异常隔离、shutdown 语义、生命周期事件触发。