跳转至

事件系统

QKA 采用事件驱动架构,通过发布-订阅模式实现模块间的松耦合通信。事件系统让您可以监听和响应系统中发生的各种事件。

为什么使用事件系统?

事件系统的优势

  • 🔗 松耦合 - 模块间无需直接依赖
  • 📡 实时响应 - 立即响应系统事件
  • 🔧 易扩展 - 轻松添加新的事件处理逻辑
  • 📊 可监控 - 完整的事件历史和统计

快速开始

启动事件系统

from qka.core.events import start_event_engine, stop_event_engine

# 启动事件引擎
start_event_engine()

# 程序结束时停止
stop_event_engine()

监听事件

使用装饰器方式监听事件:

from qka.core.events import EventType, event_handler

@event_handler(EventType.DATA_LOADED)
def on_data_loaded(event):
    print(f"数据加载完成: {event.data}")

@event_handler(EventType.ORDER_FILLED)
def on_order_filled(event):
    print(f"订单成交: {event.data}")

发送事件

from qka.core.events import emit_event

# 发送数据加载事件
emit_event(EventType.DATA_LOADED, {
    "symbol": "000001.SZ",
    "rows": 1000,
    "timespan": "2023-01-01 to 2023-12-31"
})

# 发送订单成交事件
emit_event(EventType.ORDER_FILLED, {
    "order_id": "12345",
    "symbol": "000001.SZ",
    "price": 10.50,
    "quantity": 1000
})

事件类型详解

数据相关事件

DATA_LOADED - 数据加载完成

当数据成功加载时触发。

@event_handler(EventType.DATA_LOADED)
def on_data_loaded(event):
    data = event.data
    print(f"加载了 {data['symbol']}{data['rows']} 条数据")

事件数据示例:

{
    "symbol": "000001.SZ",
    "rows": 1000,
    "timespan": "2023-01-01 to 2023-12-31",
    "source": "akshare"
}

DATA_ERROR - 数据加载错误

当数据加载失败时触发。

@event_handler(EventType.DATA_ERROR)
def on_data_error(event):
    error = event.data
    print(f"数据加载失败: {error['message']}")

交易相关事件

ORDER_CREATED - 订单创建

当新订单被创建时触发。

@event_handler(EventType.ORDER_CREATED)
def on_order_created(event):
    order = event.data
    print(f"创建订单: {order['action']} {order['symbol']} @ {order['price']}")

ORDER_FILLED - 订单成交

当订单成交时触发。

@event_handler(EventType.ORDER_FILLED)
def on_order_filled(event):
    trade = event.data
    print(f"订单成交: {trade['symbol']} 成交价 {trade['price']}")

ORDER_CANCELLED - 订单取消

当订单被取消时触发。

@event_handler(EventType.ORDER_CANCELLED)
def on_order_cancelled(event):
    order = event.data
    print(f"订单取消: {order['order_id']}")

策略相关事件

STRATEGY_START - 策略开始

当策略开始运行时触发。

@event_handler(EventType.STRATEGY_START)
def on_strategy_start(event):
    strategy = event.data
    print(f"策略 {strategy['name']} 开始运行")

SIGNAL_GENERATED - 信号生成

当策略生成交易信号时触发。

@event_handler(EventType.SIGNAL_GENERATED)
def on_signal_generated(event):
    signal = event.data
    print(f"信号: {signal['action']} {signal['symbol']}")

    # 可以在这里添加信号过滤、风险检查等逻辑
    if signal['strength'] > 0.8:
        print("强信号,建议关注!")

高级用法

自定义事件处理器

创建自定义的事件处理器类:

from qka.core.events import EventHandler, Event

class TradingEventHandler(EventHandler):
    def __init__(self):
        self.order_count = 0
        self.total_volume = 0

    def handle(self, event: Event):
        if event.event_type == EventType.ORDER_FILLED:
            self.order_count += 1
            self.total_volume += event.data.get('quantity', 0)
            print(f"总订单数: {self.order_count}, 总成交量: {self.total_volume}")

    def can_handle(self, event: Event) -> bool:
        return event.event_type == EventType.ORDER_FILLED

# 注册处理器
from qka.core.events import event_engine

handler = TradingEventHandler()
event_engine.subscribe(EventType.ORDER_FILLED, handler)

事件过滤

只处理特定条件的事件:

@event_handler(EventType.SIGNAL_GENERATED)
def handle_strong_signals(event):
    signal = event.data

    # 只处理强信号
    if signal.get('strength', 0) > 0.8:
        print(f"收到强信号: {signal}")
        # 执行相应操作

异步事件处理

from qka.core.events import AsyncEventHandler
import asyncio

class AsyncOrderHandler(AsyncEventHandler):
    async def handle_async(self, event: Event):
        if event.event_type == EventType.ORDER_FILLED:
            # 异步处理订单
            await self.update_portfolio(event.data)
            await self.send_notification(event.data)

    async def update_portfolio(self, order_data):
        # 模拟异步数据库操作
        await asyncio.sleep(0.1)
        print(f"组合更新完成: {order_data}")

    async def send_notification(self, order_data):
        # 模拟异步通知发送
        await asyncio.sleep(0.1)
        print(f"通知已发送: {order_data}")

事件统计和监控

查看事件统计

from qka.core.events import event_engine

# 获取统计信息
stats = event_engine.get_statistics()

print(f"事件计数: {stats['event_count']}")
print(f"错误计数: {stats['error_count']}")
print(f"队列大小: {stats['queue_size']}")
print(f"处理器数量: {stats['handler_count']}")
print(f"运行状态: {stats['is_running']}")

查看事件历史

# 获取所有事件历史
all_events = event_engine.get_event_history(limit=100)

# 获取特定类型的事件历史
order_events = event_engine.get_event_history(
    event_type=EventType.ORDER_FILLED, 
    limit=50
)

for event in order_events:
    print(f"{event.timestamp}: {event.data}")

在策略中使用事件

事件驱动的策略

from qka.core.backtest import Strategy
from qka.core.events import EventType, emit_event

class EventDrivenStrategy(Strategy):
    def on_start(self, broker):
        # 策略开始时发送事件
        emit_event(EventType.STRATEGY_START, {
            "strategy": self.name,
            "initial_cash": broker.initial_cash
        })

    def on_bar(self, data, broker, current_date):
        for symbol, df in data.items():
            if len(df) >= 20:
                current_price = df['close'].iloc[-1]
                ma20 = df['close'].rolling(20).mean().iloc[-1]

                # 生成信号事件
                if current_price > ma20:
                    emit_event(EventType.SIGNAL_GENERATED, {
                        "symbol": symbol,
                        "action": "BUY",
                        "price": current_price,
                        "strength": 0.8,
                        "reason": "价格突破20日均线"
                    })

                    # 执行买入
                    if broker.buy(symbol, 0.3, current_price):
                        emit_event(EventType.ORDER_FILLED, {
                            "symbol": symbol,
                            "action": "BUY",
                            "price": current_price,
                            "quantity": broker.get_position(symbol)
                        })

事件监听器

# 策略性能监控
@event_handler(EventType.ORDER_FILLED)
def monitor_strategy_performance(event):
    order = event.data

    # 记录交易日志
    with open('trades.log', 'a') as f:
        f.write(f"{event.timestamp}: {order}\n")

    # 计算收益
    if order['action'] == 'SELL':
        # 计算这笔交易的盈亏
        pass

# 风险监控
@event_handler(EventType.SIGNAL_GENERATED)
def risk_monitor(event):
    signal = event.data

    # 检查是否有过度交易
    if signal['strength'] < 0.5:
        print(f"⚠️ 弱信号警告: {signal}")

    # 检查仓位集中度
    # ...

最佳实践

1. 事件命名

为自定义事件使用清晰的命名:

# 好的命名
EventType.PORTFOLIO_REBALANCED
EventType.RISK_LIMIT_EXCEEDED
EventType.MARKET_DATA_UPDATED

# 避免的命名
EventType.EVENT1
EventType.SOMETHING_HAPPENED

2. 事件数据结构

保持事件数据结构的一致性:

# 推荐的事件数据结构
{
    "timestamp": "2023-12-01T10:30:00",
    "symbol": "000001.SZ",
    "action": "BUY",
    "price": 10.50,
    "quantity": 1000,
    "metadata": {
        "strategy": "MA_CROSS",
        "signal_strength": 0.85
    }
}

3. 错误处理

在事件处理器中添加适当的错误处理:

@event_handler(EventType.ORDER_FILLED)
def safe_order_handler(event):
    try:
        # 处理订单逻辑
        process_order(event.data)
    except Exception as e:
        print(f"处理订单事件时出错: {e}")
        # 发送错误事件
        emit_event(EventType.ORDER_ERROR, {
            "original_event": event.to_dict(),
            "error": str(e)
        })

4. 性能考虑

避免在事件处理器中执行耗时操作:

# ❌ 避免在事件处理器中执行耗时操作
@event_handler(EventType.DATA_LOADED)
def slow_handler(event):
    time.sleep(5)  # 这会阻塞事件队列

# ✅ 使用异步处理或后台任务
@event_handler(EventType.DATA_LOADED)
def fast_handler(event):
    # 快速处理或提交到后台队列
    background_queue.put(event.data)

API 参考

事件系统的详细API参考请查看 Events API文档

主要类和函数

  • EventBus - 事件总线
  • Event - 基础事件类
  • MarketDataEvent - 市场数据事件
  • OrderEvent - 订单事件
  • TradeEvent - 交易事件

更多详细信息和使用示例请参考API文档页面。