日志系统¶
QKA 提供了强大而灵活的日志系统,支持彩色输出、结构化日志、文件轮转和远程通知等功能。
为什么需要好的日志系统?¶
日志系统的重要性
- 🐛 问题诊断 - 快速定位和解决问题
- 📊 性能监控 - 监控系统运行状态
- 🔍 行为追踪 - 记录用户操作和系统行为
- 📈 数据分析 - 提供业务分析数据
- ⚠️ 告警通知 - 及时发现系统异常
快速开始¶
基础日志记录¶
from qka.utils.logger import create_logger
# 创建日志记录器
logger = create_logger('my_app')
# 记录不同级别的日志
logger.debug("调试信息:变量值为 x=10")
logger.info("程序正常运行")
logger.warning("这是一个警告")
logger.error("发生错误")
logger.critical("严重错误,程序可能无法继续")
彩色日志输出¶
# 启用彩色控制台输出
logger = create_logger('my_app', colored_console=True)
logger.debug("调试信息") # 青色
logger.info("普通信息") # 绿色
logger.warning("警告信息") # 黄色
logger.error("错误信息") # 红色
logger.critical("严重错误") # 紫色
日志配置¶
基本配置¶
from qka.utils.logger import create_logger
logger = create_logger(
name='my_app', # 日志记录器名称
level='INFO', # 日志级别: DEBUG, INFO, WARNING, ERROR, CRITICAL
console_output=True, # 是否输出到控制台
file_output=True, # 是否输出到文件
log_dir='logs', # 日志文件目录
max_file_size='10MB', # 最大文件大小
backup_count=10, # 备份文件数量
json_format=False, # 是否使用JSON格式
colored_console=True # 控制台是否使用颜色
)
使用配置文件¶
import qka
from qka.utils.logger import setup_logging_from_config
# 使用全局配置
logger = create_logger(
level=qka.config.log.level,
log_dir=qka.config.log.log_dir,
max_file_size=qka.config.log.max_file_size,
backup_count=qka.config.log.backup_count
)
结构化日志¶
基础结构化日志¶
from qka.utils.logger import get_structured_logger
# 创建结构化日志记录器
struct_logger = get_structured_logger('my_app')
# 记录带有额外字段的日志
struct_logger.info("用户登录",
user_id=12345,
ip="192.168.1.100",
action="login",
success=True)
struct_logger.error("交易失败",
symbol="000001.SZ",
reason="余额不足",
amount=10000,
user_id=12345)
交易日志示例¶
# 记录订单信息
struct_logger.info("订单创建",
order_id="ORD001",
symbol="000001.SZ",
side="BUY",
quantity=1000,
price=10.50,
strategy="MA_CROSS")
# 记录成交信息
struct_logger.info("订单成交",
order_id="ORD001",
fill_price=10.48,
fill_quantity=1000,
commission=3.14,
timestamp="2023-12-01T10:30:00")
# 记录策略信号
struct_logger.info("信号生成",
strategy="MA_CROSS",
symbol="000001.SZ",
signal="BUY",
strength=0.85,
indicators={
"ma5": 10.45,
"ma20": 10.30,
"volume": 1500000
})
文件日志管理¶
自动文件轮转¶
QKA 支持自动文件轮转,防止日志文件过大:
文件结构示例:
JSON格式日志¶
适合后续分析和处理:
JSON输出示例:
{
"timestamp": "2023-12-01T10:30:00.123456",
"level": "INFO",
"logger": "my_app",
"message": "这将以JSON格式记录",
"module": "main",
"function": "main",
"line": 15,
"extra_field": "value"
}
远程通知¶
微信群通知¶
为重要错误添加微信群通知:
from qka.utils.logger import add_wechat_handler
# 添加微信通知(只通知ERROR级别及以上)
webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
add_wechat_handler(logger, webhook_url, level='ERROR')
# 现在ERROR和CRITICAL级别的日志会发送到微信群
logger.error("这条错误信息会发送到微信群")
logger.critical("这条严重错误也会发送到微信群")
logger.warning("这条警告不会发送到微信群")
自定义通知处理器¶
import requests
from qka.utils.logger import logger
class SlackHandler(logging.Handler):
def __init__(self, webhook_url):
super().__init__()
self.webhook_url = webhook_url
def emit(self, record):
message = self.format(record)
payload = {"text": message}
try:
requests.post(self.webhook_url, json=payload)
except Exception as e:
print(f"发送Slack消息失败: {e}")
# 添加Slack通知
slack_handler = SlackHandler("YOUR_SLACK_WEBHOOK_URL")
slack_handler.setLevel(logging.ERROR)
logger.addHandler(slack_handler)
在策略中使用日志¶
策略日志记录¶
from qka.core.backtest import Strategy
from qka.utils.logger import create_logger
class LoggedStrategy(Strategy):
def __init__(self):
super().__init__()
# 为策略创建专用日志记录器
self.logger = create_logger(f'strategy_{self.name}', colored_console=True)
def on_start(self, broker):
self.logger.info(f"策略启动",
strategy=self.name,
initial_cash=broker.initial_cash,
commission_rate=broker.commission_rate)
def on_bar(self, data, broker, current_date):
for symbol, df in data.items():
if len(df) >= 20:
current_price = df['close'].iloc[-1]
ma20 = df['close'].rolling(20).mean().iloc[-1]
if current_price > ma20 and broker.get_position(symbol) == 0:
self.logger.info(f"买入信号",
symbol=symbol,
price=current_price,
ma20=ma20,
signal_strength=(current_price - ma20) / ma20)
if broker.buy(symbol, 0.3, current_price):
self.logger.info(f"买入成功",
symbol=symbol,
price=current_price,
quantity=broker.get_position(symbol),
cash_remaining=broker.get_cash())
else:
self.logger.warning(f"买入失败",
symbol=symbol,
price=current_price,
reason="资金不足或其他原因")
elif current_price < ma20 and broker.get_position(symbol) > 0:
self.logger.info(f"卖出信号",
symbol=symbol,
price=current_price,
ma20=ma20,
position=broker.get_position(symbol))
if broker.sell(symbol, 1.0, current_price):
self.logger.info(f"卖出成功",
symbol=symbol,
price=current_price,
cash_after=broker.get_cash())
def on_end(self, broker):
final_value = broker.get_total_value({})
self.logger.info(f"策略结束",
strategy=self.name,
final_value=final_value,
return_rate=(final_value - broker.initial_cash) / broker.initial_cash,
total_trades=len(broker.trades))
交易日志分析¶
import json
from datetime import datetime
def analyze_trading_logs(log_file):
"""分析交易日志文件"""
trades = []
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
if '买入成功' in line or '卖出成功' in line:
# 解析日志行(如果是JSON格式)
try:
log_data = json.loads(line)
trades.append(log_data)
except:
# 普通格式的解析
pass
# 分析交易数据
print(f"总交易次数: {len(trades)}")
return trades
# 使用示例
trades = analyze_trading_logs('logs/strategy_MA_CROSS.log')
性能监控日志¶
函数执行时间记录¶
from qka.utils.tools import timer
from qka.utils.logger import create_logger
logger = create_logger('performance')
@timer
def slow_function():
"""这个装饰器会自动记录执行时间"""
import time
time.sleep(1)
return "完成"
# 手动记录性能
import time
def manual_timing_example():
start_time = time.time()
# 执行一些操作
result = complex_calculation()
end_time = time.time()
logger.info("计算完成",
function="complex_calculation",
execution_time=end_time - start_time,
result_size=len(result))
内存使用监控¶
import psutil
import os
def log_memory_usage():
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
logger.info("内存使用情况",
rss_mb=memory_info.rss / 1024 / 1024, # 物理内存
vms_mb=memory_info.vms / 1024 / 1024, # 虚拟内存
cpu_percent=process.cpu_percent())
# 定期记录内存使用
import threading
import time
def memory_monitor():
while True:
log_memory_usage()
time.sleep(60) # 每分钟记录一次
# 启动内存监控线程
monitor_thread = threading.Thread(target=memory_monitor, daemon=True)
monitor_thread.start()
日志最佳实践¶
1. 日志级别使用¶
# DEBUG - 详细的诊断信息,仅在诊断问题时使用
logger.debug(f"计算中间结果: ma5={ma5}, ma20={ma20}")
# INFO - 一般信息,记录程序正常运行状态
logger.info("策略开始运行", strategy="MA_CROSS")
# WARNING - 警告信息,程序可以继续运行但需要注意
logger.warning("数据缺失", symbol="000001.SZ", missing_days=3)
# ERROR - 错误信息,功能无法正常执行但程序可以继续
logger.error("订单下单失败", symbol="000001.SZ", reason="余额不足")
# CRITICAL - 严重错误,程序可能无法继续运行
logger.critical("数据库连接失败", error="连接超时")
2. 敏感信息处理¶
# ❌ 不要记录敏感信息
logger.info("用户登录", password="123456", api_key="secret_key")
# ✅ 正确的做法
logger.info("用户登录",
user_id="12345",
ip="192.168.1.100",
# 密码和API密钥不记录
login_method="password")
3. 结构化信息¶
# ❌ 字符串拼接,难以解析
logger.info(f"用户{user_id}在{timestamp}买入{symbol}数量{quantity}")
# ✅ 结构化记录,便于后续分析
logger.info("用户下单",
user_id=user_id,
timestamp=timestamp,
action="BUY",
symbol=symbol,
quantity=quantity)
4. 异常记录¶
try:
result = risky_operation()
except Exception as e:
# 记录完整的异常信息
logger.error("操作失败",
operation="risky_operation",
error_type=type(e).__name__,
error_message=str(e),
exc_info=True) # 包含完整的堆栈跟踪
日志分析工具¶
实时日志监控¶
import subprocess
import re
def tail_logs(log_file, pattern=None):
"""实时监控日志文件"""
process = subprocess.Popen(['tail', '-f', log_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
try:
for line in iter(process.stdout.readline, ''):
if pattern is None or re.search(pattern, line):
print(line.strip())
except KeyboardInterrupt:
process.terminate()
# 使用示例
# tail_logs('logs/2023-12-01.log', 'ERROR') # 只显示错误日志
日志统计¶
import re
from collections import defaultdict
def analyze_log_file(log_file):
"""分析日志文件统计信息"""
level_counts = defaultdict(int)
error_types = defaultdict(int)
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
# 统计日志级别
if match := re.search(r'\[(DEBUG|INFO|WARNING|ERROR|CRITICAL)\]', line):
level_counts[match.group(1)] += 1
# 统计错误类型
if 'ERROR' in line and '失败' in line:
if '下单失败' in line:
error_types['下单失败'] += 1
elif '数据获取失败' in line:
error_types['数据获取失败'] += 1
print("日志级别统计:")
for level, count in level_counts.items():
print(f" {level}: {count}")
print("\n错误类型统计:")
for error_type, count in error_types.items():
print(f" {error_type}: {count}")
# 使用示例
analyze_log_file('logs/2023-12-01.log')
API 参考¶
日志系统的详细API参考请查看 Logger API文档。
主要类和函数¶
- Logger - 增强日志记录器
- ColorFormatter - 彩色日志格式化器
- StructuredLogger - 结构化日志记录器
更多详细信息和使用示例请参考API文档页面。