跳转至

Events API 参考

qka.core.events

QKA 事件驱动框架 提供发布-订阅模式的事件系统,支持异步和同步事件处理

EventType

Bases: Enum

事件类型枚举

Source code in qka/core/events.py
class EventType(Enum):
    """事件类型枚举"""
    # 数据相关事件
    DATA_LOADED = "data_loaded"
    DATA_ERROR = "data_error"

    # 回测相关事件
    BACKTEST_START = "backtest_start"
    BACKTEST_END = "backtest_end"
    BACKTEST_ERROR = "backtest_error"

    # 交易相关事件
    ORDER_CREATED = "order_created"
    ORDER_FILLED = "order_filled"
    ORDER_CANCELLED = "order_cancelled"
    ORDER_ERROR = "order_error"

    # 策略相关事件
    STRATEGY_START = "strategy_start"
    STRATEGY_END = "strategy_end"
    STRATEGY_ERROR = "strategy_error"
    SIGNAL_GENERATED = "signal_generated"

    # 风险管理事件
    RISK_CHECK = "risk_check"
    RISK_ALERT = "risk_alert"

    # 系统事件
    SYSTEM_START = "system_start"
    SYSTEM_SHUTDOWN = "system_shutdown"
    HEARTBEAT = "heartbeat"

Event dataclass

事件基类

Source code in qka/core/events.py
@dataclass
class Event:
    """事件基类"""
    event_type: EventType
    data: Any = None
    timestamp: datetime = None
    source: str = None
    event_id: str = None

    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()
        if self.event_id is None:
            self.event_id = f"{self.event_type.value}_{self.timestamp.strftime('%Y%m%d_%H%M%S_%f')}"

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        result = asdict(self)
        result['event_type'] = self.event_type.value
        result['timestamp'] = self.timestamp.isoformat()
        return result

to_dict

to_dict() -> Dict[str, Any]

转换为字典

Source code in qka/core/events.py
def to_dict(self) -> Dict[str, Any]:
    """转换为字典"""
    result = asdict(self)
    result['event_type'] = self.event_type.value
    result['timestamp'] = self.timestamp.isoformat()
    return result

DataEvent dataclass

Bases: Event

数据事件

Source code in qka/core/events.py
@dataclass
class DataEvent(Event):
    """数据事件"""
    symbol: str = None
    period: str = None
    count: int = None

OrderEvent dataclass

Bases: Event

订单事件

Source code in qka/core/events.py
@dataclass
class OrderEvent(Event):
    """订单事件"""
    order_id: str = None
    symbol: str = None
    action: str = None  # buy/sell
    quantity: int = None
    price: float = None
    order_type: str = None

SignalEvent dataclass

Bases: Event

信号事件

Source code in qka/core/events.py
@dataclass
class SignalEvent(Event):
    """信号事件"""
    symbol: str = None
    signal_type: str = None  # buy/sell/hold
    strength: float = None  # 信号强度 0-1
    reason: str = None

EventHandler

Bases: ABC

事件处理器基类

Source code in qka/core/events.py
class EventHandler(ABC):
    """事件处理器基类"""

    @abstractmethod
    def handle(self, event: Event) -> Optional[Any]:
        """处理事件"""
        pass

    def can_handle(self, event: Event) -> bool:
        """判断是否可以处理该事件"""
        return True

handle abstractmethod

handle(event: Event) -> Optional[Any]

处理事件

Source code in qka/core/events.py
@abstractmethod
def handle(self, event: Event) -> Optional[Any]:
    """处理事件"""
    pass

can_handle

can_handle(event: Event) -> bool

判断是否可以处理该事件

Source code in qka/core/events.py
def can_handle(self, event: Event) -> bool:
    """判断是否可以处理该事件"""
    return True

AsyncEventHandler

Bases: EventHandler

异步事件处理器基类

Source code in qka/core/events.py
class AsyncEventHandler(EventHandler):
    """异步事件处理器基类"""

    @abstractmethod
    async def handle_async(self, event: Event) -> Optional[Any]:
        """异步处理事件"""
        pass

    def handle(self, event: Event) -> Optional[Any]:
        """同步接口,内部调用异步方法"""
        return asyncio.run(self.handle_async(event))

handle_async abstractmethod async

handle_async(event: Event) -> Optional[Any]

异步处理事件

Source code in qka/core/events.py
@abstractmethod
async def handle_async(self, event: Event) -> Optional[Any]:
    """异步处理事件"""
    pass

handle

handle(event: Event) -> Optional[Any]

同步接口,内部调用异步方法

Source code in qka/core/events.py
def handle(self, event: Event) -> Optional[Any]:
    """同步接口,内部调用异步方法"""
    return asyncio.run(self.handle_async(event))

EventEngine

事件引擎

Source code in qka/core/events.py
class EventEngine:
    """事件引擎"""

    def __init__(self, max_queue_size: int = 10000):
        """
        初始化事件引擎

        Args:
            max_queue_size: 事件队列最大大小
        """
        self._handlers: Dict[EventType, List[Union[EventHandler, Callable]]] = defaultdict(list)
        self._event_queue = queue.Queue(maxsize=max_queue_size)
        self._running = False
        self._worker_thread = None
        self._event_history: List[Event] = []
        self._max_history_size = 1000

        # 统计信息
        self._event_count = defaultdict(int)
        self._error_count = 0

    def subscribe(self, event_type: EventType, handler: Union[EventHandler, Callable]):
        """
        订阅事件

        Args:
            event_type: 事件类型
            handler: 事件处理器或回调函数
        """
        self._handlers[event_type].append(handler)
        logger.debug(f"事件订阅: {event_type.value} -> {handler}")

    def unsubscribe(self, event_type: EventType, handler: Union[EventHandler, Callable]):
        """取消订阅"""
        if handler in self._handlers[event_type]:
            self._handlers[event_type].remove(handler)
            logger.debug(f"取消事件订阅: {event_type.value} -> {handler}")

    def emit(self, event: Event, sync: bool = False):
        """
        发送事件

        Args:
            event: 事件对象
            sync: 是否同步处理
        """
        if sync:
            self._process_event(event)
        else:
            try:
                self._event_queue.put_nowait(event)
            except queue.Full:
                logger.error(f"事件队列已满,丢弃事件: {event.event_type.value}")

    def emit_simple(self, event_type: EventType, data: Any = None, source: str = None, sync: bool = False):
        """
        发送简单事件

        Args:
            event_type: 事件类型
            data: 事件数据
            source: 事件源
            sync: 是否同步处理
        """
        event = Event(event_type=event_type, data=data, source=source)
        self.emit(event, sync)

    def start(self):
        """启动事件引擎"""
        if self._running:
            logger.warning("事件引擎已经在运行")
            return

        self._running = True
        self._worker_thread = threading.Thread(target=self._run, daemon=True)
        self._worker_thread.start()

        logger.info("事件引擎已启动")
        self.emit_simple(EventType.SYSTEM_START, {"timestamp": datetime.now()})

    def stop(self):
        """停止事件引擎"""
        if not self._running:
            return

        self.emit_simple(EventType.SYSTEM_SHUTDOWN, {"timestamp": datetime.now()})
        self._running = False

        if self._worker_thread:
            self._worker_thread.join(timeout=5)

        logger.info("事件引擎已停止")

    def _run(self):
        """事件处理主循环"""
        while self._running:
            try:
                # 带超时的获取事件,避免阻塞
                event = self._event_queue.get(timeout=1)
                self._process_event(event)
                self._event_queue.task_done()
            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"事件处理异常: {e}\n{traceback.format_exc()}")
                self._error_count += 1

    def _process_event(self, event: Event):
        """处理单个事件"""
        try:
            # 记录事件历史
            self._add_to_history(event)
            self._event_count[event.event_type] += 1

            # 获取处理器列表
            handlers = self._handlers.get(event.event_type, [])

            if not handlers:
                logger.debug(f"没有找到事件处理器: {event.event_type.value}")
                return

            # 执行处理器
            for handler in handlers:
                try:
                    if isinstance(handler, EventHandler):
                        if handler.can_handle(event):
                            handler.handle(event)
                    elif callable(handler):
                        handler(event)
                except Exception as e:
                    logger.error(f"事件处理器执行失败: {handler}, 事件: {event.event_type.value}, 错误: {e}")
                    self._error_count += 1

        except Exception as e:
            logger.error(f"事件处理异常: {e}\n{traceback.format_exc()}")
            self._error_count += 1

    def _add_to_history(self, event: Event):
        """添加到事件历史"""
        self._event_history.append(event)

        # 保持历史记录大小限制
        if len(self._event_history) > self._max_history_size:
            self._event_history = self._event_history[-self._max_history_size:]

    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "event_count": dict(self._event_count),
            "error_count": self._error_count,
            "queue_size": self._event_queue.qsize(),
            "handler_count": {
                event_type.value: len(handlers) 
                for event_type, handlers in self._handlers.items()
            },
            "is_running": self._running
        }

    def get_event_history(self, event_type: Optional[EventType] = None, limit: int = 100) -> List[Event]:
        """
        获取事件历史

        Args:
            event_type: 筛选事件类型
            limit: 限制返回数量
        """
        history = self._event_history

        if event_type:
            history = [e for e in history if e.event_type == event_type]

        return history[-limit:]

__init__

__init__(max_queue_size: int = 10000)

初始化事件引擎

Parameters:

Name Type Description Default
max_queue_size int

事件队列最大大小

10000
Source code in qka/core/events.py
def __init__(self, max_queue_size: int = 10000):
    """
    初始化事件引擎

    Args:
        max_queue_size: 事件队列最大大小
    """
    self._handlers: Dict[EventType, List[Union[EventHandler, Callable]]] = defaultdict(list)
    self._event_queue = queue.Queue(maxsize=max_queue_size)
    self._running = False
    self._worker_thread = None
    self._event_history: List[Event] = []
    self._max_history_size = 1000

    # 统计信息
    self._event_count = defaultdict(int)
    self._error_count = 0

subscribe

subscribe(event_type: EventType, handler: Union[EventHandler, Callable])

订阅事件

Parameters:

Name Type Description Default
event_type EventType

事件类型

required
handler Union[EventHandler, Callable]

事件处理器或回调函数

required
Source code in qka/core/events.py
def subscribe(self, event_type: EventType, handler: Union[EventHandler, Callable]):
    """
    订阅事件

    Args:
        event_type: 事件类型
        handler: 事件处理器或回调函数
    """
    self._handlers[event_type].append(handler)
    logger.debug(f"事件订阅: {event_type.value} -> {handler}")

unsubscribe

unsubscribe(event_type: EventType, handler: Union[EventHandler, Callable])

取消订阅

Source code in qka/core/events.py
def unsubscribe(self, event_type: EventType, handler: Union[EventHandler, Callable]):
    """取消订阅"""
    if handler in self._handlers[event_type]:
        self._handlers[event_type].remove(handler)
        logger.debug(f"取消事件订阅: {event_type.value} -> {handler}")

emit

emit(event: Event, sync: bool = False)

发送事件

Parameters:

Name Type Description Default
event Event

事件对象

required
sync bool

是否同步处理

False
Source code in qka/core/events.py
def emit(self, event: Event, sync: bool = False):
    """
    发送事件

    Args:
        event: 事件对象
        sync: 是否同步处理
    """
    if sync:
        self._process_event(event)
    else:
        try:
            self._event_queue.put_nowait(event)
        except queue.Full:
            logger.error(f"事件队列已满,丢弃事件: {event.event_type.value}")

emit_simple

emit_simple(event_type: EventType, data: Any = None, source: str = None, sync: bool = False)

发送简单事件

Parameters:

Name Type Description Default
event_type EventType

事件类型

required
data Any

事件数据

None
source str

事件源

None
sync bool

是否同步处理

False
Source code in qka/core/events.py
def emit_simple(self, event_type: EventType, data: Any = None, source: str = None, sync: bool = False):
    """
    发送简单事件

    Args:
        event_type: 事件类型
        data: 事件数据
        source: 事件源
        sync: 是否同步处理
    """
    event = Event(event_type=event_type, data=data, source=source)
    self.emit(event, sync)

start

start()

启动事件引擎

Source code in qka/core/events.py
def start(self):
    """启动事件引擎"""
    if self._running:
        logger.warning("事件引擎已经在运行")
        return

    self._running = True
    self._worker_thread = threading.Thread(target=self._run, daemon=True)
    self._worker_thread.start()

    logger.info("事件引擎已启动")
    self.emit_simple(EventType.SYSTEM_START, {"timestamp": datetime.now()})

stop

stop()

停止事件引擎

Source code in qka/core/events.py
def stop(self):
    """停止事件引擎"""
    if not self._running:
        return

    self.emit_simple(EventType.SYSTEM_SHUTDOWN, {"timestamp": datetime.now()})
    self._running = False

    if self._worker_thread:
        self._worker_thread.join(timeout=5)

    logger.info("事件引擎已停止")

get_statistics

get_statistics() -> Dict[str, Any]

获取统计信息

Source code in qka/core/events.py
def get_statistics(self) -> Dict[str, Any]:
    """获取统计信息"""
    return {
        "event_count": dict(self._event_count),
        "error_count": self._error_count,
        "queue_size": self._event_queue.qsize(),
        "handler_count": {
            event_type.value: len(handlers) 
            for event_type, handlers in self._handlers.items()
        },
        "is_running": self._running
    }

get_event_history

get_event_history(event_type: Optional[EventType] = None, limit: int = 100) -> List[Event]

获取事件历史

Parameters:

Name Type Description Default
event_type Optional[EventType]

筛选事件类型

None
limit int

限制返回数量

100
Source code in qka/core/events.py
def get_event_history(self, event_type: Optional[EventType] = None, limit: int = 100) -> List[Event]:
    """
    获取事件历史

    Args:
        event_type: 筛选事件类型
        limit: 限制返回数量
    """
    history = self._event_history

    if event_type:
        history = [e for e in history if e.event_type == event_type]

    return history[-limit:]

event_handler

event_handler(event_type: EventType)

事件处理器装饰器

Examples:

@event_handler(EventType.ORDER_FILLED) def on_order_filled(event): print(f"订单成交: {event.data}")

Source code in qka/core/events.py
def event_handler(event_type: EventType):
    """
    事件处理器装饰器

    Examples:
        @event_handler(EventType.ORDER_FILLED)
        def on_order_filled(event):
            print(f"订单成交: {event.data}")
    """
    def decorator(func):
        event_engine.subscribe(event_type, func)
        return func
    return decorator

emit_event

emit_event(event_type: EventType, data: Any = None, source: str = None, sync: bool = False)

发送事件的便捷函数

Source code in qka/core/events.py
def emit_event(event_type: EventType, data: Any = None, source: str = None, sync: bool = False):
    """发送事件的便捷函数"""
    event_engine.emit_simple(event_type, data, source, sync)

start_event_engine

start_event_engine()

启动事件引擎

Source code in qka/core/events.py
def start_event_engine():
    """启动事件引擎"""
    event_engine.start()

stop_event_engine

stop_event_engine()

停止事件引擎

Source code in qka/core/events.py
def stop_event_engine():
    """停止事件引擎"""
    event_engine.stop()

事件系统使用指南

基本概念

事件系统采用发布-订阅模式,支持: - 事件发布和订阅 - 异步事件处理 - 事件过滤和转换 - 事件统计和监控

基本用法

from qka.core.events import EventBus, Event

# 创建事件总线
bus = EventBus()

# 定义事件处理器
def handle_order(event):
    print(f"处理订单事件: {event}")

# 订阅事件
bus.subscribe('order_created', handle_order)

# 发布事件
event = Event('order_created', {'symbol': 'AAPL', 'quantity': 100})
bus.publish(event)

预定义事件类型

MarketDataEvent - 市场数据事件

from qka.core.events import MarketDataEvent

# 创建市场数据事件
event = MarketDataEvent(
    symbol='AAPL',
    timestamp=datetime.now(),
    data={
        'open': 150.0,
        'high': 152.0,
        'low': 149.0,
        'close': 151.0,
        'volume': 1000000
    }
)

# 订阅市场数据事件
@bus.subscribe('market_data')
def handle_market_data(event):
    symbol = event.symbol
    price = event.data['close']
    print(f"{symbol}: ${price}")

OrderEvent - 订单事件

from qka.core.events import OrderEvent

# 创建订单事件
order_event = OrderEvent(
    order_id='ORD_001',
    symbol='AAPL',
    side='buy',
    quantity=100,
    price=150.0,
    order_type='limit',
    status='pending'
)

# 订阅订单事件
@bus.subscribe('order')
def handle_order(event):
    print(f"订单 {event.order_id}: {event.status}")

TradeEvent - 交易事件

from qka.core.events import TradeEvent

# 创建交易事件
trade_event = TradeEvent(
    trade_id='TRD_001',
    order_id='ORD_001',
    symbol='AAPL',
    side='buy',
    quantity=100,
    price=150.5,
    commission=0.15,
    timestamp=datetime.now()
)

# 订阅交易事件
@bus.subscribe('trade')
def handle_trade(event):
    print(f"交易完成: {event.symbol} {event.side} {event.quantity}@{event.price}")

高级功能

异步事件处理

import asyncio
from qka.core.events import EventBus

# 创建支持异步的事件总线
bus = EventBus(async_mode=True)

# 异步事件处理器
async def async_handler(event):
    await asyncio.sleep(1)  # 模拟异步操作
    print(f"异步处理事件: {event}")

# 订阅异步处理器
bus.subscribe('async_event', async_handler)

# 发布事件(异步处理)
await bus.publish_async(Event('async_event', {'data': 'test'}))

事件过滤

# 带条件的事件订阅
def price_filter(event):
    return event.data.get('price', 0) > 100

bus.subscribe('market_data', handle_expensive_stocks, filter_func=price_filter)

# 仅处理价格大于100的股票数据
event = MarketDataEvent('AAPL', data={'price': 150})
bus.publish(event)  # 会被处理

event = MarketDataEvent('PENNY', data={'price': 5})
bus.publish(event)  # 不会被处理

事件转换

# 事件转换器
def price_transformer(event):
    # 将价格转换为人民币
    if 'price' in event.data:
        event.data['price_cny'] = event.data['price'] * 7.0
    return event

bus.subscribe('market_data', handle_cny_price, transformer=price_transformer)

批量事件处理

# 批量事件处理器
@bus.subscribe_batch('market_data', batch_size=10, timeout=5)
def handle_batch(events):
    prices = [e.data['price'] for e in events]
    avg_price = sum(prices) / len(prices)
    print(f"批量处理 {len(events)} 条数据,平均价格: {avg_price}")

# 发布多个事件
for i in range(20):
    event = MarketDataEvent(f'STOCK_{i}', data={'price': 100 + i})
    bus.publish(event)

事件统计和监控

# 获取事件统计
stats = bus.get_statistics()
print(f"总事件数: {stats['total_events']}")
print(f"订阅者数: {stats['total_subscribers']}")
print(f"事件类型分布: {stats['event_types']}")

# 监控事件处理性能
@bus.subscribe('performance_monitor')
def monitor_handler(event):
    processing_time = event.processing_time
    if processing_time > 1.0:  # 超过1秒
        print(f"事件处理较慢: {processing_time:.2f}s")

错误处理

# 错误处理器
def error_handler(event, exception):
    print(f"事件处理失败: {event}, 错误: {exception}")
    # 记录错误日志或发送告警

bus.set_error_handler(error_handler)

# 带重试的事件处理
@bus.subscribe('critical_event', retry_count=3, retry_delay=1)
def critical_handler(event):
    if random.random() < 0.5:
        raise Exception("模拟处理失败")
    print(f"关键事件处理成功: {event}")

事件持久化

# 启用事件持久化
bus.enable_persistence('events.db')

# 重放历史事件
bus.replay_events(
    event_type='market_data',
    start_time=datetime(2024, 1, 1),
    end_time=datetime(2024, 1, 31)
)

最佳实践

  1. 事件设计
  2. 事件名称要清晰、一致
  3. 事件数据结构要稳定
  4. 避免事件过于频繁

  5. 性能优化

  6. 异步处理耗时操作
  7. 合理使用批量处理
  8. 监控事件处理性能

  9. 错误处理

  10. 处理器要有错误处理逻辑
  11. 关键事件要有重试机制
  12. 记录事件处理日志

  13. 测试

  14. 模拟事件进行单元测试
  15. 测试异常情况处理
  16. 性能测试和压力测试