Module backtrader.feeds.influxfeed
Expand source code
#!/usr/bin/env python
# -*- coding: utf-8; py-indent-offset:4 -*-
###############################################################################
#
# Copyright (C) 2015-2023 Daniel Rodriguez
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import backtrader as bt
import backtrader.feed as feed
from ..utils import date2num
import datetime as dt
TIMEFRAMES = dict(
(
(bt.TimeFrame.Seconds, 's'),
(bt.TimeFrame.Minutes, 'm'),
(bt.TimeFrame.Days, 'd'),
(bt.TimeFrame.Weeks, 'w'),
(bt.TimeFrame.Months, 'm'),
(bt.TimeFrame.Years, 'y'),
)
)
class InfluxDB(feed.DataBase):
frompackages = (
('influxdb', [('InfluxDBClient', 'idbclient')]),
('influxdb.exceptions', 'InfluxDBClientError')
)
params = (
('host', '127.0.0.1'),
('port', '8086'),
('username', None),
('password', None),
('database', None),
('timeframe', bt.TimeFrame.Days),
('startdate', None),
('high', 'high_p'),
('low', 'low_p'),
('open', 'open_p'),
('close', 'close_p'),
('volume', 'volume'),
('ointerest', 'oi'),
)
def start(self):
super(InfluxDB, self).start()
try:
self.ndb = idbclient(self.p.host, self.p.port, self.p.username,
self.p.password, self.p.database)
except InfluxDBClientError as err:
print('Failed to establish connection to InfluxDB: %s' % err)
tf = '{multiple}{timeframe}'.format(
multiple=(self.p.compression if self.p.compression else 1),
timeframe=TIMEFRAMES.get(self.p.timeframe, 'd'))
if not self.p.startdate:
st = '<= now()'
else:
st = '>= \'%s\'' % self.p.startdate
# The query could already consider parameters like fromdate and todate
# to have the database skip them and not the internal code
qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", '
'mean("{low_f}") AS "low", mean("{close_f}") AS "close", '
'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" '
'FROM "{dataname}" '
'WHERE time {begin} '
'GROUP BY time({timeframe}) fill(none)').format(
open_f=self.p.open, high_f=self.p.high,
low_f=self.p.low, close_f=self.p.close,
vol_f=self.p.volume, oi_f=self.p.ointerest,
timeframe=tf, begin=st, dataname=self.p.dataname)
try:
dbars = list(self.ndb.query(qstr).get_points())
except InfluxDBClientError as err:
print('InfluxDB query failed: %s' % err)
self.biter = iter(dbars)
def _load(self):
try:
bar = next(self.biter)
except StopIteration:
return False
self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'],
'%Y-%m-%dT%H:%M:%SZ'))
self.l.open[0] = bar['open']
self.l.high[0] = bar['high']
self.l.low[0] = bar['low']
self.l.close[0] = bar['close']
self.l.volume[0] = bar['volume']
return True
Classes
class InfluxDB (*args, **kwargs)
-
Base class for LineXXX instances that hold more than one line
Expand source code
class InfluxDB(feed.DataBase): frompackages = ( ('influxdb', [('InfluxDBClient', 'idbclient')]), ('influxdb.exceptions', 'InfluxDBClientError') ) params = ( ('host', '127.0.0.1'), ('port', '8086'), ('username', None), ('password', None), ('database', None), ('timeframe', bt.TimeFrame.Days), ('startdate', None), ('high', 'high_p'), ('low', 'low_p'), ('open', 'open_p'), ('close', 'close_p'), ('volume', 'volume'), ('ointerest', 'oi'), ) def start(self): super(InfluxDB, self).start() try: self.ndb = idbclient(self.p.host, self.p.port, self.p.username, self.p.password, self.p.database) except InfluxDBClientError as err: print('Failed to establish connection to InfluxDB: %s' % err) tf = '{multiple}{timeframe}'.format( multiple=(self.p.compression if self.p.compression else 1), timeframe=TIMEFRAMES.get(self.p.timeframe, 'd')) if not self.p.startdate: st = '<= now()' else: st = '>= \'%s\'' % self.p.startdate # The query could already consider parameters like fromdate and todate # to have the database skip them and not the internal code qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", ' 'mean("{low_f}") AS "low", mean("{close_f}") AS "close", ' 'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" ' 'FROM "{dataname}" ' 'WHERE time {begin} ' 'GROUP BY time({timeframe}) fill(none)').format( open_f=self.p.open, high_f=self.p.high, low_f=self.p.low, close_f=self.p.close, vol_f=self.p.volume, oi_f=self.p.ointerest, timeframe=tf, begin=st, dataname=self.p.dataname) try: dbars = list(self.ndb.query(qstr).get_points()) except InfluxDBClientError as err: print('InfluxDB query failed: %s' % err) self.biter = iter(dbars) def _load(self): try: bar = next(self.biter) except StopIteration: return False self.l.datetime[0] = date2num(dt.datetime.strptime(bar['time'], '%Y-%m-%dT%H:%M:%SZ')) self.l.open[0] = bar['open'] self.l.high[0] = bar['high'] self.l.low[0] = bar['low'] self.l.close[0] = bar['close'] self.l.volume[0] = bar['volume'] return True
Ancestors
Class variables
var alias
var aliased
var frompackages
var linealias
var packages
var params
var plotinfo
var plotlines
Methods
def start(self)
-
Expand source code
def start(self): super(InfluxDB, self).start() try: self.ndb = idbclient(self.p.host, self.p.port, self.p.username, self.p.password, self.p.database) except InfluxDBClientError as err: print('Failed to establish connection to InfluxDB: %s' % err) tf = '{multiple}{timeframe}'.format( multiple=(self.p.compression if self.p.compression else 1), timeframe=TIMEFRAMES.get(self.p.timeframe, 'd')) if not self.p.startdate: st = '<= now()' else: st = '>= \'%s\'' % self.p.startdate # The query could already consider parameters like fromdate and todate # to have the database skip them and not the internal code qstr = ('SELECT mean("{open_f}") AS "open", mean("{high_f}") AS "high", ' 'mean("{low_f}") AS "low", mean("{close_f}") AS "close", ' 'mean("{vol_f}") AS "volume", mean("{oi_f}") AS "openinterest" ' 'FROM "{dataname}" ' 'WHERE time {begin} ' 'GROUP BY time({timeframe}) fill(none)').format( open_f=self.p.open, high_f=self.p.high, low_f=self.p.low, close_f=self.p.close, vol_f=self.p.volume, oi_f=self.p.ointerest, timeframe=tf, begin=st, dataname=self.p.dataname) try: dbars = list(self.ndb.query(qstr).get_points()) except InfluxDBClientError as err: print('InfluxDB query failed: %s' % err) self.biter = iter(dbars)
Inherited members