Events API 参考¶
qka.core.events
¶
QKA 事件驱动框架 提供发布-订阅模式的事件系统,支持异步和同步事件处理
EventType
¶
Bases: Enum
事件类型枚举
Source code in qka/core/events.py
Event
dataclass
¶
事件基类
Source code in qka/core/events.py
DataEvent
dataclass
¶
OrderEvent
dataclass
¶
SignalEvent
dataclass
¶
AsyncEventHandler
¶
EventEngine
¶
事件引擎
Source code in qka/core/events.py
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 |
|
__init__
¶
初始化事件引擎
Parameters:
Name | Type | Description | Default |
---|---|---|---|
max_queue_size
|
int
|
事件队列最大大小 |
10000
|
Source code in qka/core/events.py
subscribe
¶
订阅事件
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_type
|
EventType
|
事件类型 |
required |
handler
|
Union[EventHandler, Callable]
|
事件处理器或回调函数 |
required |
Source code in qka/core/events.py
unsubscribe
¶
emit
¶
发送事件
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
|
Event
|
事件对象 |
required |
sync
|
bool
|
是否同步处理 |
False
|
Source code in qka/core/events.py
emit_simple
¶
发送简单事件
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event_type
|
EventType
|
事件类型 |
required |
data
|
Any
|
事件数据 |
None
|
source
|
str
|
事件源 |
None
|
sync
|
bool
|
是否同步处理 |
False
|
Source code in qka/core/events.py
start
¶
启动事件引擎
Source code in qka/core/events.py
stop
¶
停止事件引擎
Source code in qka/core/events.py
get_statistics
¶
获取统计信息
Source code in qka/core/events.py
event_handler
¶
事件处理器装饰器
Examples:
@event_handler(EventType.ORDER_FILLED) def on_order_filled(event): print(f"订单成交: {event.data}")
Source code in qka/core/events.py
emit_event
¶
start_event_engine
¶
事件系统使用指南¶
基本概念¶
事件系统采用发布-订阅模式,支持: - 事件发布和订阅 - 异步事件处理 - 事件过滤和转换 - 事件统计和监控
基本用法¶
from qka.core.events import EventBus, Event
# 创建事件总线
bus = EventBus()
# 定义事件处理器
def handle_order(event):
print(f"处理订单事件: {event}")
# 订阅事件
bus.subscribe('order_created', handle_order)
# 发布事件
event = Event('order_created', {'symbol': 'AAPL', 'quantity': 100})
bus.publish(event)
预定义事件类型¶
MarketDataEvent - 市场数据事件¶
from qka.core.events import MarketDataEvent
# 创建市场数据事件
event = MarketDataEvent(
symbol='AAPL',
timestamp=datetime.now(),
data={
'open': 150.0,
'high': 152.0,
'low': 149.0,
'close': 151.0,
'volume': 1000000
}
)
# 订阅市场数据事件
@bus.subscribe('market_data')
def handle_market_data(event):
symbol = event.symbol
price = event.data['close']
print(f"{symbol}: ${price}")
OrderEvent - 订单事件¶
from qka.core.events import OrderEvent
# 创建订单事件
order_event = OrderEvent(
order_id='ORD_001',
symbol='AAPL',
side='buy',
quantity=100,
price=150.0,
order_type='limit',
status='pending'
)
# 订阅订单事件
@bus.subscribe('order')
def handle_order(event):
print(f"订单 {event.order_id}: {event.status}")
TradeEvent - 交易事件¶
from qka.core.events import TradeEvent
# 创建交易事件
trade_event = TradeEvent(
trade_id='TRD_001',
order_id='ORD_001',
symbol='AAPL',
side='buy',
quantity=100,
price=150.5,
commission=0.15,
timestamp=datetime.now()
)
# 订阅交易事件
@bus.subscribe('trade')
def handle_trade(event):
print(f"交易完成: {event.symbol} {event.side} {event.quantity}@{event.price}")
高级功能¶
异步事件处理¶
import asyncio
from qka.core.events import EventBus
# 创建支持异步的事件总线
bus = EventBus(async_mode=True)
# 异步事件处理器
async def async_handler(event):
await asyncio.sleep(1) # 模拟异步操作
print(f"异步处理事件: {event}")
# 订阅异步处理器
bus.subscribe('async_event', async_handler)
# 发布事件(异步处理)
await bus.publish_async(Event('async_event', {'data': 'test'}))
事件过滤¶
# 带条件的事件订阅
def price_filter(event):
return event.data.get('price', 0) > 100
bus.subscribe('market_data', handle_expensive_stocks, filter_func=price_filter)
# 仅处理价格大于100的股票数据
event = MarketDataEvent('AAPL', data={'price': 150})
bus.publish(event) # 会被处理
event = MarketDataEvent('PENNY', data={'price': 5})
bus.publish(event) # 不会被处理
事件转换¶
# 事件转换器
def price_transformer(event):
# 将价格转换为人民币
if 'price' in event.data:
event.data['price_cny'] = event.data['price'] * 7.0
return event
bus.subscribe('market_data', handle_cny_price, transformer=price_transformer)
批量事件处理¶
# 批量事件处理器
@bus.subscribe_batch('market_data', batch_size=10, timeout=5)
def handle_batch(events):
prices = [e.data['price'] for e in events]
avg_price = sum(prices) / len(prices)
print(f"批量处理 {len(events)} 条数据,平均价格: {avg_price}")
# 发布多个事件
for i in range(20):
event = MarketDataEvent(f'STOCK_{i}', data={'price': 100 + i})
bus.publish(event)
事件统计和监控¶
# 获取事件统计
stats = bus.get_statistics()
print(f"总事件数: {stats['total_events']}")
print(f"订阅者数: {stats['total_subscribers']}")
print(f"事件类型分布: {stats['event_types']}")
# 监控事件处理性能
@bus.subscribe('performance_monitor')
def monitor_handler(event):
processing_time = event.processing_time
if processing_time > 1.0: # 超过1秒
print(f"事件处理较慢: {processing_time:.2f}s")
错误处理¶
# 错误处理器
def error_handler(event, exception):
print(f"事件处理失败: {event}, 错误: {exception}")
# 记录错误日志或发送告警
bus.set_error_handler(error_handler)
# 带重试的事件处理
@bus.subscribe('critical_event', retry_count=3, retry_delay=1)
def critical_handler(event):
if random.random() < 0.5:
raise Exception("模拟处理失败")
print(f"关键事件处理成功: {event}")
事件持久化¶
# 启用事件持久化
bus.enable_persistence('events.db')
# 重放历史事件
bus.replay_events(
event_type='market_data',
start_time=datetime(2024, 1, 1),
end_time=datetime(2024, 1, 31)
)
最佳实践¶
- 事件设计
- 事件名称要清晰、一致
- 事件数据结构要稳定
-
避免事件过于频繁
-
性能优化
- 异步处理耗时操作
- 合理使用批量处理
-
监控事件处理性能
-
错误处理
- 处理器要有错误处理逻辑
- 关键事件要有重试机制
-
记录事件处理日志
-
测试
- 模拟事件进行单元测试
- 测试异常情况处理
- 性能测试和压力测试