实盘交易¶
QKA 提供了完整的实盘交易解决方案,支持多种券商接口和交易模式。
交易环境配置¶
券商配置¶
在配置文件中设置券商信息:
{
"trading": {
"broker": "华泰证券",
"account": "your_account",
"server_host": "localhost",
"server_port": 8888,
"timeout": 30,
"auto_login": true,
"credentials": {
"username": "your_username",
"password": "your_password",
"communication_password": "your_comm_password"
}
}
}
安全设置¶
import os
from qka.core.config import config
# 使用环境变量存储敏感信息
os.environ['QKA_BROKER_USERNAME'] = 'your_username'
os.environ['QKA_BROKER_PASSWORD'] = 'your_password'
# 配置会自动读取环境变量
config.trading.credentials.username = os.getenv('QKA_BROKER_USERNAME')
交易客户端¶
创建交易客户端¶
from qka.brokers import QMTClient
from qka.brokers.trade import Order, Trade, Position
# 创建交易客户端
client = QMTClient(base_url="http://localhost:8000", token="your_token")
# 连接到券商
try:
client.connect()
print("连接成功")
except Exception as e:
print(f"连接失败: {e}")
账户信息查询¶
# 查询账户资金
account_info = client.get_account_info()
print(f"总资产: {account_info.total_value:,.2f}")
print(f"可用资金: {account_info.available_cash:,.2f}")
print(f"市值: {account_info.market_value:,.2f}")
# 查询持仓
positions = client.get_positions()
for position in positions:
print(f"{position.symbol}: {position.volume}股, 成本: {position.cost:.2f}")
# 查询委托
orders = client.get_orders()
for order in orders:
print(f"{order.symbol}: {order.status} {order.volume}股 @ {order.price:.2f}")
下单交易¶
基本下单¶
from qka.brokers.trade import OrderType, OrderSide
# 市价买入
buy_order = client.place_order(
symbol='000001.SZ',
side=OrderSide.BUY,
order_type=OrderType.MARKET,
volume=100
)
print(f"买单ID: {buy_order.order_id}")
# 限价卖出
sell_order = client.place_order(
symbol='000001.SZ',
side=OrderSide.SELL,
order_type=OrderType.LIMIT,
volume=100,
price=15.50
)
print(f"卖单ID: {sell_order.order_id}")
高级订单类型¶
# 止损单
stop_loss_order = client.place_order(
symbol='000001.SZ',
side=OrderSide.SELL,
order_type=OrderType.STOP_LOSS,
volume=100,
stop_price=14.50
)
# 止盈单
take_profit_order = client.place_order(
symbol='000001.SZ',
side=OrderSide.SELL,
order_type=OrderType.TAKE_PROFIT,
volume=100,
stop_price=16.50
)
# 条件单
conditional_order = client.place_conditional_order(
symbol='000001.SZ',
condition='price >= 16.0',
action='buy',
volume=200
)
批量下单¶
# 批量下单
orders = [
{
'symbol': '000001.SZ',
'side': OrderSide.BUY,
'volume': 100,
'price': 15.00
},
{
'symbol': '000002.SZ',
'side': OrderSide.BUY,
'volume': 200,
'price': 12.50
}
]
batch_results = client.place_batch_orders(orders)
for result in batch_results:
if result.success:
print(f"订单 {result.order_id} 提交成功")
else:
print(f"订单提交失败: {result.error}")
实盘策略运行¶
策略实盘化¶
from qka.core.backtest import Strategy
from qka.brokers.live import LiveEngine
class LiveTradingStrategy(Strategy):
def __init__(self):
super().__init__()
self.name = "实盘交易策略"
self.client = TradingClient()
def on_init(self):
"""策略初始化"""
# 连接交易客户端
self.client.connect()
# 订阅实时数据
self.subscribe_realtime('000001.SZ')
# 初始化风险控制
self.setup_risk_management()
def on_data(self, data):
"""实时数据处理"""
for symbol in data.index:
# 获取当前持仓
position = self.client.get_position(symbol)
current_price = data.loc[symbol, 'last_price']
# 策略逻辑
signal = self.generate_signal(symbol, data)
if signal == 'BUY' and not position:
self.execute_buy(symbol, current_price)
elif signal == 'SELL' and position:
self.execute_sell(symbol, position.volume, current_price)
def execute_buy(self, symbol, price):
"""执行买入"""
# 计算仓位大小
volume = self.calculate_position_size(symbol, price)
# 风险检查
if self.check_risk_limits(symbol, volume, price):
order = self.client.place_order(
symbol=symbol,
side=OrderSide.BUY,
order_type=OrderType.LIMIT,
volume=volume,
price=price * 1.001 # 稍微高于市价确保成交
)
self.log(f"买入订单已提交: {order.order_id}")
else:
self.log(f"风险控制: 跳过买入 {symbol}")
def execute_sell(self, symbol, volume, price):
"""执行卖出"""
order = self.client.place_order(
symbol=symbol,
side=OrderSide.SELL,
order_type=OrderType.LIMIT,
volume=volume,
price=price * 0.999 # 稍微低于市价确保成交
)
self.log(f"卖出订单已提交: {order.order_id}")
启动实盘引擎¶
# 创建实盘引擎
live_engine = LiveEngine()
# 配置实盘引擎
live_engine.configure(
strategy=LiveTradingStrategy(),
data_frequency='1min', # 分钟级数据
order_timeout=30, # 订单超时时间
max_retry=3 # 最大重试次数
)
# 启动实盘交易
live_engine.start()
# 策略将在后台运行,处理实时数据和交易
风险管理¶
实盘风险控制¶
class RiskManager:
def __init__(self, client):
self.client = client
self.max_position_ratio = 0.1 # 单股最大仓位10%
self.max_daily_loss = 0.05 # 日最大亏损5%
self.max_drawdown = 0.15 # 最大回撤15%
def check_position_limit(self, symbol, volume, price):
"""检查仓位限制"""
account_info = self.client.get_account_info()
position_value = volume * price
position_ratio = position_value / account_info.total_value
return position_ratio <= self.max_position_ratio
def check_daily_loss_limit(self):
"""检查日损失限制"""
today_pnl = self.client.get_today_pnl()
account_info = self.client.get_account_info()
loss_ratio = abs(today_pnl) / account_info.total_value
return loss_ratio <= self.max_daily_loss
def emergency_stop(self):
"""紧急止损"""
# 撤销所有未成交订单
pending_orders = self.client.get_pending_orders()
for order in pending_orders:
self.client.cancel_order(order.order_id)
# 平仓所有持仓
positions = self.client.get_positions()
for position in positions:
self.client.place_order(
symbol=position.symbol,
side=OrderSide.SELL,
order_type=OrderType.MARKET,
volume=position.volume
)
self.log("执行紧急止损")
实时监控¶
from qka.brokers.monitor import TradingMonitor
monitor = TradingMonitor(client)
# 设置监控规则
monitor.add_rule(
name='日亏损监控',
condition=lambda: monitor.get_daily_pnl_ratio() < -0.05,
action=monitor.send_alert
)
monitor.add_rule(
name='仓位监控',
condition=lambda: monitor.get_max_position_ratio() > 0.15,
action=monitor.reduce_positions
)
# 启动监控
monitor.start()
数据同步¶
实时行情订阅¶
from qka.brokers.data import RealtimeDataFeed
# 创建实时数据源
data_feed = RealtimeDataFeed()
# 订阅股票行情
symbols = ['000001.SZ', '000002.SZ', '600000.SH']
data_feed.subscribe(symbols)
# 设置数据回调
def on_tick_data(tick):
print(f"{tick.symbol}: {tick.last_price} ({tick.timestamp})")
data_feed.set_tick_callback(on_tick_data)
# 启动数据接收
data_feed.start()
数据存储¶
from qka.core.data import DataStorage
storage = DataStorage()
# 存储实时数据
def save_tick_data(tick):
storage.save_tick(
symbol=tick.symbol,
timestamp=tick.timestamp,
price=tick.last_price,
volume=tick.volume,
bid=tick.bid,
ask=tick.ask
)
data_feed.set_tick_callback(save_tick_data)
交易记录与分析¶
交易日志¶
from qka.utils.logger import get_structured_logger
trade_logger = get_structured_logger('trading')
def log_trade(order):
trade_logger.info('trade_executed', {
'order_id': order.order_id,
'symbol': order.symbol,
'side': order.side,
'volume': order.volume,
'price': order.executed_price,
'timestamp': order.executed_time,
'commission': order.commission
})
实盘表现分析¶
from qka.core.backtest import LivePerformanceAnalyzer
analyzer = LivePerformanceAnalyzer(client)
# 生成日报
daily_report = analyzer.generate_daily_report()
print(f"今日收益: {daily_report.daily_return:.2%}")
print(f"今日交易次数: {daily_report.trade_count}")
# 生成周报
weekly_report = analyzer.generate_weekly_report()
print(f"本周收益: {weekly_report.weekly_return:.2%}")
print(f"本周胜率: {weekly_report.win_rate:.2%}")
模拟交易¶
模拟环境¶
from qka.brokers.simulator import SimulatorClient
# 创建模拟交易客户端
sim_client = SimulatorClient(
initial_cash=1000000,
commission_rate=0.0003
)
# 使用模拟客户端进行测试
strategy = LiveTradingStrategy()
strategy.client = sim_client
# 在模拟环境中运行策略
sim_engine = LiveEngine()
sim_engine.configure(
strategy=strategy,
client=sim_client,
mode='simulation'
)
sim_engine.start()
异常处理¶
连接异常¶
def handle_connection_error():
"""处理连接异常"""
max_retry = 3
retry_count = 0
while retry_count < max_retry:
try:
client.reconnect()
print("重连成功")
break
except Exception as e:
retry_count += 1
print(f"重连失败 ({retry_count}/{max_retry}): {e}")
time.sleep(5)
if retry_count >= max_retry:
print("连接失败,停止交易")
live_engine.stop()
订单异常¶
def handle_order_error(order, error):
"""处理订单异常"""
if '资金不足' in str(error):
print("资金不足,调整仓位大小")
# 重新计算仓位
elif '涨跌停' in str(error):
print("股票涨跌停,取消订单")
# 取消相关订单
else:
print(f"订单异常: {error}")
# 记录错误日志
最佳实践¶
- 充分测试:在模拟环境中充分测试策略
- 风险控制:设置多层风险控制机制
- 实时监控:建立完善的监控和告警系统
- 异常处理:考虑各种异常情况的处理
- 数据备份:定期备份交易数据和日志
- 版本控制:对策略代码进行版本管理
- 渐进上线:从小资金开始,逐步增加投入
注意事项¶
- 合规要求:确保符合监管要求
- 系统稳定性:确保网络和系统稳定
- 数据延迟:考虑数据传输延迟的影响
- 交易时间:注意交易时间段的限制
- 资金安全:妥善保管账户信息
API 参考¶
交易模块的详细API参考请查看 API文档。
主要类和函数¶
- QMTClient - 交易客户端接口
- Order - 订单对象
- Trade - 交易记录
- Position - 持仓信息
更多详细信息请参考对应的API文档页面。