Module backtrader.feed

Expand source code
#!/usr/bin/env python
# -*- coding: utf-8; py-indent-offset:4 -*-
###############################################################################
#
# Copyright (C) 2015-2023 Daniel Rodriguez
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import collections
import datetime
import inspect
import io
import os.path

import backtrader as bt
from backtrader import (date2num, num2date, time2num, TimeFrame, dataseries,
                        metabase)

from backtrader.utils.py3 import with_metaclass, zip, range, string_types
from backtrader.utils import tzparse
from .dataseries import SimpleFilterWrapper
from .resamplerfilter import Resampler, Replayer
from .tradingcal import PandasMarketCalendar


class MetaAbstractDataBase(dataseries.OHLCDateTime.__class__):
    _indcol = dict()

    def __init__(cls, name, bases, dct):
        '''
        Class has already been created ... register subclasses
        '''
        # Initialize the class
        super(MetaAbstractDataBase, cls).__init__(name, bases, dct)

        if not cls.aliased and \
           name != 'DataBase' and not name.startswith('_'):
            cls._indcol[name] = cls

    def dopreinit(cls, _obj, *args, **kwargs):
        _obj, args, kwargs = \
            super(MetaAbstractDataBase, cls).dopreinit(_obj, *args, **kwargs)

        # Find the owner and store it
        _obj._feed = metabase.findowner(_obj, FeedBase)

        _obj.notifs = collections.deque()  # store notifications for cerebro

        _obj._dataname = _obj.p.dataname
        _obj._name = ''
        return _obj, args, kwargs

    def dopostinit(cls, _obj, *args, **kwargs):
        _obj, args, kwargs = \
            super(MetaAbstractDataBase, cls).dopostinit(_obj, *args, **kwargs)

        # Either set by subclass or the parameter or use the dataname (ticker)
        _obj._name = _obj._name or _obj.p.name
        if not _obj._name and isinstance(_obj.p.dataname, string_types):
            _obj._name = _obj.p.dataname
        _obj._compression = _obj.p.compression
        _obj._timeframe = _obj.p.timeframe

        if isinstance(_obj.p.sessionstart, datetime.datetime):
            _obj.p.sessionstart = _obj.p.sessionstart.time()

        elif _obj.p.sessionstart is None:
            _obj.p.sessionstart = datetime.time.min

        if isinstance(_obj.p.sessionend, datetime.datetime):
            _obj.p.sessionend = _obj.p.sessionend.time()

        elif _obj.p.sessionend is None:
            # remove 9 to avoid precision rounding errors
            _obj.p.sessionend = datetime.time(23, 59, 59, 999990)

        if isinstance(_obj.p.fromdate, datetime.date):
            # push it to the end of the day, or else intraday
            # values before the end of the day would be gone
            if not hasattr(_obj.p.fromdate, 'hour'):
                _obj.p.fromdate = datetime.datetime.combine(
                    _obj.p.fromdate, _obj.p.sessionstart)

        if isinstance(_obj.p.todate, datetime.date):
            # push it to the end of the day, or else intraday
            # values before the end of the day would be gone
            if not hasattr(_obj.p.todate, 'hour'):
                _obj.p.todate = datetime.datetime.combine(
                    _obj.p.todate, _obj.p.sessionend)

        _obj._barstack = collections.deque()  # for filter operations
        _obj._barstash = collections.deque()  # for filter operations

        _obj._filters = list()
        _obj._ffilters = list()
        for fp in _obj.p.filters:
            if inspect.isclass(fp):
                fp = fp(_obj)
                if hasattr(fp, 'last'):
                    _obj._ffilters.append((fp, [], {}))

            _obj._filters.append((fp, [], {}))

        return _obj, args, kwargs


class AbstractDataBase(with_metaclass(MetaAbstractDataBase,
                                      dataseries.OHLCDateTime)):

    params = (
        ('dataname', None),
        ('name', ''),
        ('compression', 1),
        ('timeframe', TimeFrame.Days),
        ('fromdate', None),
        ('todate', None),
        ('sessionstart', None),
        ('sessionend', None),
        ('filters', []),
        ('tz', None),
        ('tzinput', None),
        ('qcheck', 0.0),  # timeout in seconds (float) to check for events
        ('calendar', None),
    )

    (CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED,
     LIVE, NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN) = range(8)

    _NOTIFNAMES = [
        'CONNECTED', 'DISCONNECTED', 'CONNBROKEN', 'DELAYED',
        'LIVE', 'NOTSUBSCRIBED', 'NOTSUPPORTED_TIMEFRAME', 'UNKNOWN']

    @classmethod
    def _getstatusname(cls, status):
        return cls._NOTIFNAMES[status]

    _compensate = None
    _feed = None
    _store = None

    _clone = False
    _qcheck = 0.0

    _tmoffset = datetime.timedelta()

    # Set to non 0 if resampling/replaying
    resampling = 0
    replaying = 0

    _started = False

    def _start_finish(self):
        # A live feed (for example) may have learnt something about the
        # timezones after the start and that's why the date/time related
        # parameters are converted at this late stage
        # Get the output timezone (if any)
        self._tz = self._gettz()
        # Lines have already been create, set the tz
        self.lines.datetime._settz(self._tz)

        # This should probably be also called from an override-able method
        self._tzinput = bt.utils.date.Localizer(self._gettzinput())

        # Convert user input times to the output timezone (or min/max)
        if self.p.fromdate is None:
            self.fromdate = float('-inf')
        else:
            self.fromdate = self.date2num(self.p.fromdate)

        if self.p.todate is None:
            self.todate = float('inf')
        else:
            self.todate = self.date2num(self.p.todate)

        # FIXME: These two are never used and could be removed
        self.sessionstart = time2num(self.p.sessionstart)
        self.sessionend = time2num(self.p.sessionend)

        self._calendar = cal = self.p.calendar
        if cal is None:
            self._calendar = self._env._tradingcal
        elif isinstance(cal, string_types):
            self._calendar = PandasMarketCalendar(calendar=cal)

        self._started = True

    def _start(self):
        self.start()

        if not self._started:
            self._start_finish()

    def _timeoffset(self):
        return self._tmoffset

    def _getnexteos(self):
        '''Returns the next eos using a trading calendar if available'''
        if self._clone:
            return self.data._getnexteos()

        if not len(self):
            return datetime.datetime.min, 0.0

        dt = self.lines.datetime[0]
        dtime = num2date(dt)
        if self._calendar is None:
            nexteos = datetime.datetime.combine(dtime, self.p.sessionend)
            nextdteos = self.date2num(nexteos)  # locl'ed -> utc-like
            nexteos = num2date(nextdteos)  # utc
            while dtime > nexteos:
                nexteos += datetime.timedelta(days=1)  # already utc-like

            nextdteos = date2num(nexteos)  # -> utc-like

        else:
            # returns times in utc
            _, nexteos = self._calendar.schedule(dtime, self._tz)
            nextdteos = date2num(nexteos)  # nextos is already utc

        return nexteos, nextdteos

    def _gettzinput(self):
        '''Can be overriden by classes to return a timezone for input'''
        return tzparse(self.p.tzinput)

    def _gettz(self):
        '''To be overriden by subclasses which may auto-calculate the
        timezone'''
        return tzparse(self.p.tz)

    def date2num(self, dt):
        if self._tz is not None:
            return date2num(self._tz.localize(dt))

        return date2num(dt)

    def num2date(self, dt=None, tz=None, naive=True):
        if dt is None:
            return num2date(self.lines.datetime[0], tz or self._tz, naive)

        return num2date(dt, tz or self._tz, naive)

    def haslivedata(self):
        return False  # must be overriden for those that can

    def do_qcheck(self, onoff, qlapse):
        # if onoff is True the data will wait p.qcheck for incoming live data
        # on its queue.
        qwait = self.p.qcheck if onoff else 0.0
        qwait = max(0.0, qwait - qlapse)
        self._qcheck = qwait

    def islive(self):
        '''If this returns True, ``Cerebro`` will deactivate ``preload`` and
        ``runonce`` because a live data source must be fetched tick by tick (or
        bar by bar)'''
        return False

    def put_notification(self, status, *args, **kwargs):
        '''Add arguments to notification queue'''
        if self._laststatus != status:
            self.notifs.append((status, args, kwargs))
            self._laststatus = status

    def get_notifications(self):
        '''Return the pending "store" notifications'''
        # The background thread could keep on adding notifications. The None
        # mark allows to identify which is the last notification to deliver
        self.notifs.append(None)  # put a mark
        notifs = list()
        while True:
            notif = self.notifs.popleft()
            if notif is None:  # mark is reached
                break
            notifs.append(notif)

        return notifs

    def getfeed(self):
        return self._feed

    def qbuffer(self, savemem=0, replaying=False):
        extrasize = self.resampling or replaying
        for line in self.lines:
            line.qbuffer(savemem=savemem, extrasize=extrasize)

    def start(self):
        self._barstack = collections.deque()
        self._barstash = collections.deque()
        self._laststatus = self.CONNECTED

    def stop(self):
        pass

    def clone(self, **kwargs):
        return DataClone(dataname=self, **kwargs)

    def copyas(self, _dataname, **kwargs):
        d = DataClone(dataname=self, **kwargs)
        d._dataname = _dataname
        d._name = _dataname
        return d

    def setenvironment(self, env):
        '''Keep a reference to the environment'''
        self._env = env

    def getenvironment(self):
        return self._env

    def addfilter_simple(self, f, *args, **kwargs):
        fp = SimpleFilterWrapper(self, f, *args, **kwargs)
        self._filters.append((fp, fp.args, fp.kwargs))

    def addfilter(self, p, *args, **kwargs):
        if inspect.isclass(p):
            pobj = p(self, *args, **kwargs)
            self._filters.append((pobj, [], {}))

            if hasattr(pobj, 'last'):
                self._ffilters.append((pobj, [], {}))

        else:
            self._filters.append((p, args, kwargs))

    def compensate(self, other):
        '''Call it to let the broker know that actions on this asset will
        compensate open positions in another'''

        self._compensate = other

    def _tick_nullify(self):
        # These are the updating prices in case the new bar is "updated"
        # and the length doesn't change like if a replay is happening or
        # a real-time data feed is in use and 1 minutes bars are being
        # constructed with 5 seconds updates
        for lalias in self.getlinealiases():
            if lalias != 'datetime':
                setattr(self, 'tick_' + lalias, None)

        self.tick_last = None

    def _tick_fill(self, force=False):
        # If nothing filled the tick_xxx attributes, the bar is the tick
        alias0 = self._getlinealias(0)
        if force or getattr(self, 'tick_' + alias0, None) is None:
            for lalias in self.getlinealiases():
                if lalias != 'datetime':
                    setattr(self, 'tick_' + lalias,
                            getattr(self.lines, lalias)[0])

            self.tick_last = getattr(self.lines, alias0)[0]

    def advance_peek(self):
        if len(self) < self.buflen():
            return self.lines.datetime[1]  # return the future

        return float('inf')  # max date else

    def advance(self, size=1, datamaster=None, ticks=True):
        if ticks:
            self._tick_nullify()

        # Need intercepting this call to support datas with
        # different lengths (timeframes)
        self.lines.advance(size)

        if datamaster is not None:
            if len(self) > self.buflen():
                # if no bar can be delivered, fill with an empty bar
                self.rewind()
                self.lines.forward()
                return

            if self.lines.datetime[0] > datamaster.lines.datetime[0]:
                self.lines.rewind()
            else:
                if ticks:
                    self._tick_fill()
        elif len(self) < self.buflen():
            # a resampler may have advance us past the last point
            if ticks:
                self._tick_fill()

    def next(self, datamaster=None, ticks=True):

        if len(self) >= self.buflen():
            if ticks:
                self._tick_nullify()

            # not preloaded - request next bar
            ret = self.load()
            if not ret:
                # if load cannot produce bars - forward the result
                return ret

            if datamaster is None:
                # bar is there and no master ... return load's result
                if ticks:
                    self._tick_fill()
                return ret
        else:
            self.advance(ticks=ticks)

        # a bar is "loaded" or was preloaded - index has been moved to it
        if datamaster is not None:
            # there is a time reference to check against
            if self.lines.datetime[0] > datamaster.lines.datetime[0]:
                # can't deliver new bar, too early, go back
                self.rewind()
                return False
            else:
                if ticks:
                    self._tick_fill()

        else:
            if ticks:
                self._tick_fill()

        # tell the world there is a bar (either the new or the previous
        return True

    def preload(self):
        while self.load():
            pass

        self._last()
        self.home()

    def _last(self, datamaster=None):
        # Last chance for filters to deliver something
        ret = 0
        for ff, fargs, fkwargs in self._ffilters:
            ret += ff.last(self, *fargs, **fkwargs)

        doticks = False
        if datamaster is not None and self._barstack:
            doticks = True

        while self._fromstack(forward=True):
            # consume bar(s) produced by "last"s - adding room
            pass

        if doticks:
            self._tick_fill()

        return bool(ret)

    def _check(self, forcedata=None):
        ret = 0
        for ff, fargs, fkwargs in self._filters:
            if not hasattr(ff, 'check'):
                continue
            ff.check(self, _forcedata=forcedata, *fargs, **fkwargs)

    def load(self):
        while True:
            # move data pointer forward for new bar
            self.forward()

            if self._fromstack():  # bar is available
                return True

            if not self._fromstack(stash=True):
                _loadret = self._load()
                if not _loadret:  # no bar use force to make sure in exactbars
                    # the pointer is undone this covers especially (but not
                    # uniquely) the case in which the last bar has been seen
                    # and a backwards would ruin pointer accounting in the
                    # "stop" method of the strategy
                    self.backwards(force=True)  # undo data pointer

                    # return the actual returned value which may be None to
                    # signal no bar is available, but the data feed is not
                    # done. False means game over
                    return _loadret

            # Get a reference to current loaded time
            dt = self.lines.datetime[0]

            # A bar has been loaded, adapt the time
            if self._tzinput:
                # Input has been converted at face value but it's not UTC in
                # the input stream
                dtime = num2date(dt)  # get it in a naive datetime
                # localize it
                dtime = self._tzinput.localize(dtime)  # pytz compatible-ized
                self.lines.datetime[0] = dt = date2num(dtime)  # keep UTC val

            # Check standard date from/to filters
            if dt < self.fromdate:
                # discard loaded bar and carry on
                self.backwards()
                continue
            if dt > self.todate:
                # discard loaded bar and break out
                self.backwards(force=True)
                break

            # Pass through filters
            retff = False
            for ff, fargs, fkwargs in self._filters:
                # previous filter may have put things onto the stack
                if self._barstack:
                    for i in range(len(self._barstack)):
                        self._fromstack(forward=True)
                        retff = ff(self, *fargs, **fkwargs)
                else:
                    retff = ff(self, *fargs, **fkwargs)

                if retff:  # bar removed from systemn
                    break  # out of the inner loop

            if retff:  # bar removed from system - loop to get new bar
                continue  # in the greater loop

            # Checks let the bar through ... notify it
            return True

        # Out of the loop ... no more bars or past todate
        return False

    def _load(self):
        return False

    def _add2stack(self, bar, stash=False):
        '''Saves given bar (list of values) to the stack for later retrieval'''
        if not stash:
            self._barstack.append(bar)
        else:
            self._barstash.append(bar)

    def _save2stack(self, erase=False, force=False, stash=False):
        '''Saves current bar to the bar stack for later retrieval

        Parameter ``erase`` determines removal from the data stream
        '''
        bar = [line[0] for line in self.itersize()]
        if not stash:
            self._barstack.append(bar)
        else:
            self._barstash.append(bar)

        if erase:  # remove bar if requested
            self.backwards(force=force)

    def _updatebar(self, bar, forward=False, ago=0):
        '''Load a value from the stack onto the lines to form the new bar

        Returns True if values are present, False otherwise
        '''
        if forward:
            self.forward()

        for line, val in zip(self.itersize(), bar):
            line[0 + ago] = val

    def _fromstack(self, forward=False, stash=False):
        '''Load a value from the stack onto the lines to form the new bar

        Returns True if values are present, False otherwise
        '''

        coll = self._barstack if not stash else self._barstash

        if coll:
            if forward:
                self.forward()

            for line, val in zip(self.itersize(), coll.popleft()):
                line[0] = val

            return True

        return False

    def resample(self, **kwargs):
        self.addfilter(Resampler, **kwargs)

    def replay(self, **kwargs):
        self.addfilter(Replayer, **kwargs)


class DataBase(AbstractDataBase):
    pass


class FeedBase(with_metaclass(metabase.MetaParams, object)):
    params = () + DataBase.params._gettuple()

    def __init__(self):
        self.datas = list()

    def start(self):
        for data in self.datas:
            data.start()

    def stop(self):
        for data in self.datas:
            data.stop()

    def getdata(self, dataname, name=None, **kwargs):
        for pname, pvalue in self.p._getitems():
            kwargs.setdefault(pname, getattr(self.p, pname))

        kwargs['dataname'] = dataname
        data = self._getdata(**kwargs)

        data._name = name

        self.datas.append(data)
        return data

    def _getdata(self, dataname, **kwargs):
        for pname, pvalue in self.p._getitems():
            kwargs.setdefault(pname, getattr(self.p, pname))

        kwargs['dataname'] = dataname
        return self.DataCls(**kwargs)


class MetaCSVDataBase(DataBase.__class__):
    def dopostinit(cls, _obj, *args, **kwargs):
        # Before going to the base class to make sure it overrides the default
        if not _obj.p.name and not _obj._name:
            _obj._name, _ = os.path.splitext(os.path.basename(_obj.p.dataname))

        _obj, args, kwargs = \
            super(MetaCSVDataBase, cls).dopostinit(_obj, *args, **kwargs)

        return _obj, args, kwargs


class CSVDataBase(with_metaclass(MetaCSVDataBase, DataBase)):
    '''
    Base class for classes implementing CSV DataFeeds

    The class takes care of opening the file, reading the lines and
    tokenizing them.

    Subclasses do only need to override:

      - _loadline(tokens)

    The return value of ``_loadline`` (True/False) will be the return value
    of ``_load`` which has been overriden by this base class
    '''

    f = None
    params = (('headers', True), ('separator', ','),)

    def start(self):
        super(CSVDataBase, self).start()

        if self.f is None:
            if hasattr(self.p.dataname, 'readline'):
                self.f = self.p.dataname
            else:
                # Let an exception propagate to let the caller know
                self.f = io.open(self.p.dataname, 'r')

        if self.p.headers:
            self.f.readline()  # skip the headers

        self.separator = self.p.separator

    def stop(self):
        super(CSVDataBase, self).stop()
        if self.f is not None:
            self.f.close()
            self.f = None

    def preload(self):
        while self.load():
            pass

        self._last()
        self.home()

        # preloaded - no need to keep the object around - breaks multip in 3.x
        self.f.close()
        self.f = None

    def _load(self):
        if self.f is None:
            return False

        # Let an exception propagate to let the caller know
        line = self.f.readline()

        if not line:
            return False

        line = line.rstrip('\n')
        linetokens = line.split(self.separator)
        return self._loadline(linetokens)

    def _getnextline(self):
        if self.f is None:
            return None

        # Let an exception propagate to let the caller know
        line = self.f.readline()

        if not line:
            return None

        line = line.rstrip('\n')
        linetokens = line.split(self.separator)
        return linetokens


class CSVFeedBase(FeedBase):
    params = (('basepath', ''),) + CSVDataBase.params._gettuple()

    def _getdata(self, dataname, **kwargs):
        return self.DataCls(dataname=self.p.basepath + dataname,
                            **self.p._getkwargs())


class DataClone(AbstractDataBase):
    _clone = True

    def __init__(self):
        self.data = self.p.dataname
        self._dataname = self.data._dataname

        # Copy date/session parameters
        self.p.fromdate = self.p.fromdate
        self.p.todate = self.p.todate
        self.p.sessionstart = self.data.p.sessionstart
        self.p.sessionend = self.data.p.sessionend

        self.p.timeframe = self.data.p.timeframe
        self.p.compression = self.data.p.compression

    def _start(self):
        # redefine to copy data bits from guest data
        self.start()

        # Copy tz infos
        self._tz = self.data._tz
        self.lines.datetime._settz(self._tz)

        self._calendar = self.data._calendar

        # input has already been converted by guest data
        self._tzinput = None  # no need to further converr

        # Copy dates/session infos
        self.fromdate = self.data.fromdate
        self.todate = self.data.todate

        # FIXME: if removed from guest, remove here too
        self.sessionstart = self.data.sessionstart
        self.sessionend = self.data.sessionend

    def start(self):
        super(DataClone, self).start()
        self._dlen = 0
        self._preloading = False

    def preload(self):
        self._preloading = True
        super(DataClone, self).preload()
        self.data.home()  # preloading data was pushed forward
        self._preloading = False

    def _load(self):
        # assumption: the data is in the system
        # simply copy the lines
        if self._preloading:
            # data is preloaded, we are preloading too, can move
            # forward until have full bar or data source is exhausted
            self.data.advance()
            if len(self.data) > self.data.buflen():
                return False

            for line, dline in zip(self.lines, self.data.lines):
                line[0] = dline[0]

            return True

        # Not preloading
        if not (len(self.data) > self._dlen):
            # Data not beyond last seen bar
            return False

        self._dlen += 1

        for line, dline in zip(self.lines, self.data.lines):
            line[0] = dline[0]

        return True

    def advance(self, size=1, datamaster=None, ticks=True):
        self._dlen += size
        super(DataClone, self).advance(size, datamaster, ticks=ticks)

Classes

class AbstractDataBase (*args, **kwargs)

Base class for LineXXX instances that hold more than one line

Expand source code
class AbstractDataBase(with_metaclass(MetaAbstractDataBase,
                                      dataseries.OHLCDateTime)):

    params = (
        ('dataname', None),
        ('name', ''),
        ('compression', 1),
        ('timeframe', TimeFrame.Days),
        ('fromdate', None),
        ('todate', None),
        ('sessionstart', None),
        ('sessionend', None),
        ('filters', []),
        ('tz', None),
        ('tzinput', None),
        ('qcheck', 0.0),  # timeout in seconds (float) to check for events
        ('calendar', None),
    )

    (CONNECTED, DISCONNECTED, CONNBROKEN, DELAYED,
     LIVE, NOTSUBSCRIBED, NOTSUPPORTED_TF, UNKNOWN) = range(8)

    _NOTIFNAMES = [
        'CONNECTED', 'DISCONNECTED', 'CONNBROKEN', 'DELAYED',
        'LIVE', 'NOTSUBSCRIBED', 'NOTSUPPORTED_TIMEFRAME', 'UNKNOWN']

    @classmethod
    def _getstatusname(cls, status):
        return cls._NOTIFNAMES[status]

    _compensate = None
    _feed = None
    _store = None

    _clone = False
    _qcheck = 0.0

    _tmoffset = datetime.timedelta()

    # Set to non 0 if resampling/replaying
    resampling = 0
    replaying = 0

    _started = False

    def _start_finish(self):
        # A live feed (for example) may have learnt something about the
        # timezones after the start and that's why the date/time related
        # parameters are converted at this late stage
        # Get the output timezone (if any)
        self._tz = self._gettz()
        # Lines have already been create, set the tz
        self.lines.datetime._settz(self._tz)

        # This should probably be also called from an override-able method
        self._tzinput = bt.utils.date.Localizer(self._gettzinput())

        # Convert user input times to the output timezone (or min/max)
        if self.p.fromdate is None:
            self.fromdate = float('-inf')
        else:
            self.fromdate = self.date2num(self.p.fromdate)

        if self.p.todate is None:
            self.todate = float('inf')
        else:
            self.todate = self.date2num(self.p.todate)

        # FIXME: These two are never used and could be removed
        self.sessionstart = time2num(self.p.sessionstart)
        self.sessionend = time2num(self.p.sessionend)

        self._calendar = cal = self.p.calendar
        if cal is None:
            self._calendar = self._env._tradingcal
        elif isinstance(cal, string_types):
            self._calendar = PandasMarketCalendar(calendar=cal)

        self._started = True

    def _start(self):
        self.start()

        if not self._started:
            self._start_finish()

    def _timeoffset(self):
        return self._tmoffset

    def _getnexteos(self):
        '''Returns the next eos using a trading calendar if available'''
        if self._clone:
            return self.data._getnexteos()

        if not len(self):
            return datetime.datetime.min, 0.0

        dt = self.lines.datetime[0]
        dtime = num2date(dt)
        if self._calendar is None:
            nexteos = datetime.datetime.combine(dtime, self.p.sessionend)
            nextdteos = self.date2num(nexteos)  # locl'ed -> utc-like
            nexteos = num2date(nextdteos)  # utc
            while dtime > nexteos:
                nexteos += datetime.timedelta(days=1)  # already utc-like

            nextdteos = date2num(nexteos)  # -> utc-like

        else:
            # returns times in utc
            _, nexteos = self._calendar.schedule(dtime, self._tz)
            nextdteos = date2num(nexteos)  # nextos is already utc

        return nexteos, nextdteos

    def _gettzinput(self):
        '''Can be overriden by classes to return a timezone for input'''
        return tzparse(self.p.tzinput)

    def _gettz(self):
        '''To be overriden by subclasses which may auto-calculate the
        timezone'''
        return tzparse(self.p.tz)

    def date2num(self, dt):
        if self._tz is not None:
            return date2num(self._tz.localize(dt))

        return date2num(dt)

    def num2date(self, dt=None, tz=None, naive=True):
        if dt is None:
            return num2date(self.lines.datetime[0], tz or self._tz, naive)

        return num2date(dt, tz or self._tz, naive)

    def haslivedata(self):
        return False  # must be overriden for those that can

    def do_qcheck(self, onoff, qlapse):
        # if onoff is True the data will wait p.qcheck for incoming live data
        # on its queue.
        qwait = self.p.qcheck if onoff else 0.0
        qwait = max(0.0, qwait - qlapse)
        self._qcheck = qwait

    def islive(self):
        '''If this returns True, ``Cerebro`` will deactivate ``preload`` and
        ``runonce`` because a live data source must be fetched tick by tick (or
        bar by bar)'''
        return False

    def put_notification(self, status, *args, **kwargs):
        '''Add arguments to notification queue'''
        if self._laststatus != status:
            self.notifs.append((status, args, kwargs))
            self._laststatus = status

    def get_notifications(self):
        '''Return the pending "store" notifications'''
        # The background thread could keep on adding notifications. The None
        # mark allows to identify which is the last notification to deliver
        self.notifs.append(None)  # put a mark
        notifs = list()
        while True:
            notif = self.notifs.popleft()
            if notif is None:  # mark is reached
                break
            notifs.append(notif)

        return notifs

    def getfeed(self):
        return self._feed

    def qbuffer(self, savemem=0, replaying=False):
        extrasize = self.resampling or replaying
        for line in self.lines:
            line.qbuffer(savemem=savemem, extrasize=extrasize)

    def start(self):
        self._barstack = collections.deque()
        self._barstash = collections.deque()
        self._laststatus = self.CONNECTED

    def stop(self):
        pass

    def clone(self, **kwargs):
        return DataClone(dataname=self, **kwargs)

    def copyas(self, _dataname, **kwargs):
        d = DataClone(dataname=self, **kwargs)
        d._dataname = _dataname
        d._name = _dataname
        return d

    def setenvironment(self, env):
        '''Keep a reference to the environment'''
        self._env = env

    def getenvironment(self):
        return self._env

    def addfilter_simple(self, f, *args, **kwargs):
        fp = SimpleFilterWrapper(self, f, *args, **kwargs)
        self._filters.append((fp, fp.args, fp.kwargs))

    def addfilter(self, p, *args, **kwargs):
        if inspect.isclass(p):
            pobj = p(self, *args, **kwargs)
            self._filters.append((pobj, [], {}))

            if hasattr(pobj, 'last'):
                self._ffilters.append((pobj, [], {}))

        else:
            self._filters.append((p, args, kwargs))

    def compensate(self, other):
        '''Call it to let the broker know that actions on this asset will
        compensate open positions in another'''

        self._compensate = other

    def _tick_nullify(self):
        # These are the updating prices in case the new bar is "updated"
        # and the length doesn't change like if a replay is happening or
        # a real-time data feed is in use and 1 minutes bars are being
        # constructed with 5 seconds updates
        for lalias in self.getlinealiases():
            if lalias != 'datetime':
                setattr(self, 'tick_' + lalias, None)

        self.tick_last = None

    def _tick_fill(self, force=False):
        # If nothing filled the tick_xxx attributes, the bar is the tick
        alias0 = self._getlinealias(0)
        if force or getattr(self, 'tick_' + alias0, None) is None:
            for lalias in self.getlinealiases():
                if lalias != 'datetime':
                    setattr(self, 'tick_' + lalias,
                            getattr(self.lines, lalias)[0])

            self.tick_last = getattr(self.lines, alias0)[0]

    def advance_peek(self):
        if len(self) < self.buflen():
            return self.lines.datetime[1]  # return the future

        return float('inf')  # max date else

    def advance(self, size=1, datamaster=None, ticks=True):
        if ticks:
            self._tick_nullify()

        # Need intercepting this call to support datas with
        # different lengths (timeframes)
        self.lines.advance(size)

        if datamaster is not None:
            if len(self) > self.buflen():
                # if no bar can be delivered, fill with an empty bar
                self.rewind()
                self.lines.forward()
                return

            if self.lines.datetime[0] > datamaster.lines.datetime[0]:
                self.lines.rewind()
            else:
                if ticks:
                    self._tick_fill()
        elif len(self) < self.buflen():
            # a resampler may have advance us past the last point
            if ticks:
                self._tick_fill()

    def next(self, datamaster=None, ticks=True):

        if len(self) >= self.buflen():
            if ticks:
                self._tick_nullify()

            # not preloaded - request next bar
            ret = self.load()
            if not ret:
                # if load cannot produce bars - forward the result
                return ret

            if datamaster is None:
                # bar is there and no master ... return load's result
                if ticks:
                    self._tick_fill()
                return ret
        else:
            self.advance(ticks=ticks)

        # a bar is "loaded" or was preloaded - index has been moved to it
        if datamaster is not None:
            # there is a time reference to check against
            if self.lines.datetime[0] > datamaster.lines.datetime[0]:
                # can't deliver new bar, too early, go back
                self.rewind()
                return False
            else:
                if ticks:
                    self._tick_fill()

        else:
            if ticks:
                self._tick_fill()

        # tell the world there is a bar (either the new or the previous
        return True

    def preload(self):
        while self.load():
            pass

        self._last()
        self.home()

    def _last(self, datamaster=None):
        # Last chance for filters to deliver something
        ret = 0
        for ff, fargs, fkwargs in self._ffilters:
            ret += ff.last(self, *fargs, **fkwargs)

        doticks = False
        if datamaster is not None and self._barstack:
            doticks = True

        while self._fromstack(forward=True):
            # consume bar(s) produced by "last"s - adding room
            pass

        if doticks:
            self._tick_fill()

        return bool(ret)

    def _check(self, forcedata=None):
        ret = 0
        for ff, fargs, fkwargs in self._filters:
            if not hasattr(ff, 'check'):
                continue
            ff.check(self, _forcedata=forcedata, *fargs, **fkwargs)

    def load(self):
        while True:
            # move data pointer forward for new bar
            self.forward()

            if self._fromstack():  # bar is available
                return True

            if not self._fromstack(stash=True):
                _loadret = self._load()
                if not _loadret:  # no bar use force to make sure in exactbars
                    # the pointer is undone this covers especially (but not
                    # uniquely) the case in which the last bar has been seen
                    # and a backwards would ruin pointer accounting in the
                    # "stop" method of the strategy
                    self.backwards(force=True)  # undo data pointer

                    # return the actual returned value which may be None to
                    # signal no bar is available, but the data feed is not
                    # done. False means game over
                    return _loadret

            # Get a reference to current loaded time
            dt = self.lines.datetime[0]

            # A bar has been loaded, adapt the time
            if self._tzinput:
                # Input has been converted at face value but it's not UTC in
                # the input stream
                dtime = num2date(dt)  # get it in a naive datetime
                # localize it
                dtime = self._tzinput.localize(dtime)  # pytz compatible-ized
                self.lines.datetime[0] = dt = date2num(dtime)  # keep UTC val

            # Check standard date from/to filters
            if dt < self.fromdate:
                # discard loaded bar and carry on
                self.backwards()
                continue
            if dt > self.todate:
                # discard loaded bar and break out
                self.backwards(force=True)
                break

            # Pass through filters
            retff = False
            for ff, fargs, fkwargs in self._filters:
                # previous filter may have put things onto the stack
                if self._barstack:
                    for i in range(len(self._barstack)):
                        self._fromstack(forward=True)
                        retff = ff(self, *fargs, **fkwargs)
                else:
                    retff = ff(self, *fargs, **fkwargs)

                if retff:  # bar removed from systemn
                    break  # out of the inner loop

            if retff:  # bar removed from system - loop to get new bar
                continue  # in the greater loop

            # Checks let the bar through ... notify it
            return True

        # Out of the loop ... no more bars or past todate
        return False

    def _load(self):
        return False

    def _add2stack(self, bar, stash=False):
        '''Saves given bar (list of values) to the stack for later retrieval'''
        if not stash:
            self._barstack.append(bar)
        else:
            self._barstash.append(bar)

    def _save2stack(self, erase=False, force=False, stash=False):
        '''Saves current bar to the bar stack for later retrieval

        Parameter ``erase`` determines removal from the data stream
        '''
        bar = [line[0] for line in self.itersize()]
        if not stash:
            self._barstack.append(bar)
        else:
            self._barstash.append(bar)

        if erase:  # remove bar if requested
            self.backwards(force=force)

    def _updatebar(self, bar, forward=False, ago=0):
        '''Load a value from the stack onto the lines to form the new bar

        Returns True if values are present, False otherwise
        '''
        if forward:
            self.forward()

        for line, val in zip(self.itersize(), bar):
            line[0 + ago] = val

    def _fromstack(self, forward=False, stash=False):
        '''Load a value from the stack onto the lines to form the new bar

        Returns True if values are present, False otherwise
        '''

        coll = self._barstack if not stash else self._barstash

        if coll:
            if forward:
                self.forward()

            for line, val in zip(self.itersize(), coll.popleft()):
                line[0] = val

            return True

        return False

    def resample(self, **kwargs):
        self.addfilter(Resampler, **kwargs)

    def replay(self, **kwargs):
        self.addfilter(Replayer, **kwargs)

Ancestors

Subclasses

Class variables

var CONNBROKEN
var CONNECTED
var DELAYED
var DISCONNECTED
var LIVE
var NOTSUBSCRIBED
var NOTSUPPORTED_TF
var UNKNOWN
var alias
var aliased
var frompackages
var linealias
var packages
var params
var plotinfo
var plotlines
var replaying
var resampling

Methods

def addfilter(self, p, *args, **kwargs)
Expand source code
def addfilter(self, p, *args, **kwargs):
    if inspect.isclass(p):
        pobj = p(self, *args, **kwargs)
        self._filters.append((pobj, [], {}))

        if hasattr(pobj, 'last'):
            self._ffilters.append((pobj, [], {}))

    else:
        self._filters.append((p, args, kwargs))
def addfilter_simple(self, f, *args, **kwargs)
Expand source code
def addfilter_simple(self, f, *args, **kwargs):
    fp = SimpleFilterWrapper(self, f, *args, **kwargs)
    self._filters.append((fp, fp.args, fp.kwargs))
def advance(self, size=1, datamaster=None, ticks=True)
Expand source code
def advance(self, size=1, datamaster=None, ticks=True):
    if ticks:
        self._tick_nullify()

    # Need intercepting this call to support datas with
    # different lengths (timeframes)
    self.lines.advance(size)

    if datamaster is not None:
        if len(self) > self.buflen():
            # if no bar can be delivered, fill with an empty bar
            self.rewind()
            self.lines.forward()
            return

        if self.lines.datetime[0] > datamaster.lines.datetime[0]:
            self.lines.rewind()
        else:
            if ticks:
                self._tick_fill()
    elif len(self) < self.buflen():
        # a resampler may have advance us past the last point
        if ticks:
            self._tick_fill()
def advance_peek(self)
Expand source code
def advance_peek(self):
    if len(self) < self.buflen():
        return self.lines.datetime[1]  # return the future

    return float('inf')  # max date else
def clone(self, **kwargs)
Expand source code
def clone(self, **kwargs):
    return DataClone(dataname=self, **kwargs)
def compensate(self, other)

Call it to let the broker know that actions on this asset will compensate open positions in another

Expand source code
def compensate(self, other):
    '''Call it to let the broker know that actions on this asset will
    compensate open positions in another'''

    self._compensate = other
def copyas(self, _dataname, **kwargs)
Expand source code
def copyas(self, _dataname, **kwargs):
    d = DataClone(dataname=self, **kwargs)
    d._dataname = _dataname
    d._name = _dataname
    return d
def date2num(self, dt)
Expand source code
def date2num(self, dt):
    if self._tz is not None:
        return date2num(self._tz.localize(dt))

    return date2num(dt)
def do_qcheck(self, onoff, qlapse)
Expand source code
def do_qcheck(self, onoff, qlapse):
    # if onoff is True the data will wait p.qcheck for incoming live data
    # on its queue.
    qwait = self.p.qcheck if onoff else 0.0
    qwait = max(0.0, qwait - qlapse)
    self._qcheck = qwait
def get_notifications(self)

Return the pending "store" notifications

Expand source code
def get_notifications(self):
    '''Return the pending "store" notifications'''
    # The background thread could keep on adding notifications. The None
    # mark allows to identify which is the last notification to deliver
    self.notifs.append(None)  # put a mark
    notifs = list()
    while True:
        notif = self.notifs.popleft()
        if notif is None:  # mark is reached
            break
        notifs.append(notif)

    return notifs
def getenvironment(self)
Expand source code
def getenvironment(self):
    return self._env
def getfeed(self)
Expand source code
def getfeed(self):
    return self._feed
def haslivedata(self)
Expand source code
def haslivedata(self):
    return False  # must be overriden for those that can
def islive(self)

If this returns True, Cerebro will deactivate preload and runonce because a live data source must be fetched tick by tick (or bar by bar)

Expand source code
def islive(self):
    '''If this returns True, ``Cerebro`` will deactivate ``preload`` and
    ``runonce`` because a live data source must be fetched tick by tick (or
    bar by bar)'''
    return False
def load(self)
Expand source code
def load(self):
    while True:
        # move data pointer forward for new bar
        self.forward()

        if self._fromstack():  # bar is available
            return True

        if not self._fromstack(stash=True):
            _loadret = self._load()
            if not _loadret:  # no bar use force to make sure in exactbars
                # the pointer is undone this covers especially (but not
                # uniquely) the case in which the last bar has been seen
                # and a backwards would ruin pointer accounting in the
                # "stop" method of the strategy
                self.backwards(force=True)  # undo data pointer

                # return the actual returned value which may be None to
                # signal no bar is available, but the data feed is not
                # done. False means game over
                return _loadret

        # Get a reference to current loaded time
        dt = self.lines.datetime[0]

        # A bar has been loaded, adapt the time
        if self._tzinput:
            # Input has been converted at face value but it's not UTC in
            # the input stream
            dtime = num2date(dt)  # get it in a naive datetime
            # localize it
            dtime = self._tzinput.localize(dtime)  # pytz compatible-ized
            self.lines.datetime[0] = dt = date2num(dtime)  # keep UTC val

        # Check standard date from/to filters
        if dt < self.fromdate:
            # discard loaded bar and carry on
            self.backwards()
            continue
        if dt > self.todate:
            # discard loaded bar and break out
            self.backwards(force=True)
            break

        # Pass through filters
        retff = False
        for ff, fargs, fkwargs in self._filters:
            # previous filter may have put things onto the stack
            if self._barstack:
                for i in range(len(self._barstack)):
                    self._fromstack(forward=True)
                    retff = ff(self, *fargs, **fkwargs)
            else:
                retff = ff(self, *fargs, **fkwargs)

            if retff:  # bar removed from systemn
                break  # out of the inner loop

        if retff:  # bar removed from system - loop to get new bar
            continue  # in the greater loop

        # Checks let the bar through ... notify it
        return True

    # Out of the loop ... no more bars or past todate
    return False
def num2date(self, dt=None, tz=None, naive=True)
Expand source code
def num2date(self, dt=None, tz=None, naive=True):
    if dt is None:
        return num2date(self.lines.datetime[0], tz or self._tz, naive)

    return num2date(dt, tz or self._tz, naive)
def preload(self)
Expand source code
def preload(self):
    while self.load():
        pass

    self._last()
    self.home()
def put_notification(self, status, *args, **kwargs)

Add arguments to notification queue

Expand source code
def put_notification(self, status, *args, **kwargs):
    '''Add arguments to notification queue'''
    if self._laststatus != status:
        self.notifs.append((status, args, kwargs))
        self._laststatus = status
def replay(self, **kwargs)
Expand source code
def replay(self, **kwargs):
    self.addfilter(Replayer, **kwargs)
def resample(self, **kwargs)
Expand source code
def resample(self, **kwargs):
    self.addfilter(Resampler, **kwargs)
def setenvironment(self, env)

Keep a reference to the environment

Expand source code
def setenvironment(self, env):
    '''Keep a reference to the environment'''
    self._env = env
def start(self)
Expand source code
def start(self):
    self._barstack = collections.deque()
    self._barstash = collections.deque()
    self._laststatus = self.CONNECTED
def stop(self)
Expand source code
def stop(self):
    pass

Inherited members

class CSVDataBase (*args, **kwargs)

Base class for classes implementing CSV DataFeeds

The class takes care of opening the file, reading the lines and tokenizing them.

Subclasses do only need to override:

  • _loadline(tokens)

The return value of _loadline (True/False) will be the return value of _load which has been overriden by this base class

Expand source code
class CSVDataBase(with_metaclass(MetaCSVDataBase, DataBase)):
    '''
    Base class for classes implementing CSV DataFeeds

    The class takes care of opening the file, reading the lines and
    tokenizing them.

    Subclasses do only need to override:

      - _loadline(tokens)

    The return value of ``_loadline`` (True/False) will be the return value
    of ``_load`` which has been overriden by this base class
    '''

    f = None
    params = (('headers', True), ('separator', ','),)

    def start(self):
        super(CSVDataBase, self).start()

        if self.f is None:
            if hasattr(self.p.dataname, 'readline'):
                self.f = self.p.dataname
            else:
                # Let an exception propagate to let the caller know
                self.f = io.open(self.p.dataname, 'r')

        if self.p.headers:
            self.f.readline()  # skip the headers

        self.separator = self.p.separator

    def stop(self):
        super(CSVDataBase, self).stop()
        if self.f is not None:
            self.f.close()
            self.f = None

    def preload(self):
        while self.load():
            pass

        self._last()
        self.home()

        # preloaded - no need to keep the object around - breaks multip in 3.x
        self.f.close()
        self.f = None

    def _load(self):
        if self.f is None:
            return False

        # Let an exception propagate to let the caller know
        line = self.f.readline()

        if not line:
            return False

        line = line.rstrip('\n')
        linetokens = line.split(self.separator)
        return self._loadline(linetokens)

    def _getnextline(self):
        if self.f is None:
            return None

        # Let an exception propagate to let the caller know
        line = self.f.readline()

        if not line:
            return None

        line = line.rstrip('\n')
        linetokens = line.split(self.separator)
        return linetokens

Ancestors

Subclasses

Class variables

var alias
var aliased
var f
var frompackages
var linealias
var packages
var params
var plotinfo
var plotlines

Methods

def preload(self)
Expand source code
def preload(self):
    while self.load():
        pass

    self._last()
    self.home()

    # preloaded - no need to keep the object around - breaks multip in 3.x
    self.f.close()
    self.f = None
def start(self)
Expand source code
def start(self):
    super(CSVDataBase, self).start()

    if self.f is None:
        if hasattr(self.p.dataname, 'readline'):
            self.f = self.p.dataname
        else:
            # Let an exception propagate to let the caller know
            self.f = io.open(self.p.dataname, 'r')

    if self.p.headers:
        self.f.readline()  # skip the headers

    self.separator = self.p.separator
def stop(self)
Expand source code
def stop(self):
    super(CSVDataBase, self).stop()
    if self.f is not None:
        self.f.close()
        self.f = None

Inherited members

class CSVFeedBase
Expand source code
class CSVFeedBase(FeedBase):
    params = (('basepath', ''),) + CSVDataBase.params._gettuple()

    def _getdata(self, dataname, **kwargs):
        return self.DataCls(dataname=self.p.basepath + dataname,
                            **self.p._getkwargs())

Ancestors

Subclasses

Class variables

var frompackages
var packages
var params
class DataBase (*args, **kwargs)

Base class for LineXXX instances that hold more than one line

Expand source code
class DataBase(AbstractDataBase):
    pass

Ancestors

Subclasses

Class variables

var alias
var aliased
var frompackages
var linealias
var packages
var params
var plotinfo
var plotlines

Inherited members

class DataClone

Base class for LineXXX instances that hold more than one line

Expand source code
class DataClone(AbstractDataBase):
    _clone = True

    def __init__(self):
        self.data = self.p.dataname
        self._dataname = self.data._dataname

        # Copy date/session parameters
        self.p.fromdate = self.p.fromdate
        self.p.todate = self.p.todate
        self.p.sessionstart = self.data.p.sessionstart
        self.p.sessionend = self.data.p.sessionend

        self.p.timeframe = self.data.p.timeframe
        self.p.compression = self.data.p.compression

    def _start(self):
        # redefine to copy data bits from guest data
        self.start()

        # Copy tz infos
        self._tz = self.data._tz
        self.lines.datetime._settz(self._tz)

        self._calendar = self.data._calendar

        # input has already been converted by guest data
        self._tzinput = None  # no need to further converr

        # Copy dates/session infos
        self.fromdate = self.data.fromdate
        self.todate = self.data.todate

        # FIXME: if removed from guest, remove here too
        self.sessionstart = self.data.sessionstart
        self.sessionend = self.data.sessionend

    def start(self):
        super(DataClone, self).start()
        self._dlen = 0
        self._preloading = False

    def preload(self):
        self._preloading = True
        super(DataClone, self).preload()
        self.data.home()  # preloading data was pushed forward
        self._preloading = False

    def _load(self):
        # assumption: the data is in the system
        # simply copy the lines
        if self._preloading:
            # data is preloaded, we are preloading too, can move
            # forward until have full bar or data source is exhausted
            self.data.advance()
            if len(self.data) > self.data.buflen():
                return False

            for line, dline in zip(self.lines, self.data.lines):
                line[0] = dline[0]

            return True

        # Not preloading
        if not (len(self.data) > self._dlen):
            # Data not beyond last seen bar
            return False

        self._dlen += 1

        for line, dline in zip(self.lines, self.data.lines):
            line[0] = dline[0]

        return True

    def advance(self, size=1, datamaster=None, ticks=True):
        self._dlen += size
        super(DataClone, self).advance(size, datamaster, ticks=ticks)

Ancestors

Class variables

var alias
var aliased
var frompackages
var linealias
var packages
var params
var plotinfo
var plotlines

Methods

def advance(self, size=1, datamaster=None, ticks=True)
Expand source code
def advance(self, size=1, datamaster=None, ticks=True):
    self._dlen += size
    super(DataClone, self).advance(size, datamaster, ticks=ticks)
def preload(self)
Expand source code
def preload(self):
    self._preloading = True
    super(DataClone, self).preload()
    self.data.home()  # preloading data was pushed forward
    self._preloading = False
def start(self)
Expand source code
def start(self):
    super(DataClone, self).start()
    self._dlen = 0
    self._preloading = False

Inherited members

class FeedBase
Expand source code
class FeedBase(with_metaclass(metabase.MetaParams, object)):
    params = () + DataBase.params._gettuple()

    def __init__(self):
        self.datas = list()

    def start(self):
        for data in self.datas:
            data.start()

    def stop(self):
        for data in self.datas:
            data.stop()

    def getdata(self, dataname, name=None, **kwargs):
        for pname, pvalue in self.p._getitems():
            kwargs.setdefault(pname, getattr(self.p, pname))

        kwargs['dataname'] = dataname
        data = self._getdata(**kwargs)

        data._name = name

        self.datas.append(data)
        return data

    def _getdata(self, dataname, **kwargs):
        for pname, pvalue in self.p._getitems():
            kwargs.setdefault(pname, getattr(self.p, pname))

        kwargs['dataname'] = dataname
        return self.DataCls(**kwargs)

Subclasses

Class variables

var frompackages
var packages
var params

Methods

def getdata(self, dataname, name=None, **kwargs)
Expand source code
def getdata(self, dataname, name=None, **kwargs):
    for pname, pvalue in self.p._getitems():
        kwargs.setdefault(pname, getattr(self.p, pname))

    kwargs['dataname'] = dataname
    data = self._getdata(**kwargs)

    data._name = name

    self.datas.append(data)
    return data
def start(self)
Expand source code
def start(self):
    for data in self.datas:
        data.start()
def stop(self)
Expand source code
def stop(self):
    for data in self.datas:
        data.stop()
class MetaAbstractDataBase (name, bases, dct)

Dirty job manager for a LineSeries

  • During new (class creation), it reads "lines", "plotinfo", "plotlines" class variable definitions and turns them into Classes of type Lines or AutoClassInfo (plotinfo/plotlines)

  • During "new" (instance creation) the lines/plotinfo/plotlines classes are substituted in the instance with instances of the aforementioned classes and aliases are added for the "lines" held in the "lines" instance

    Additionally and for remaining kwargs, these are matched against args in plotinfo and if existent are set there and removed from kwargs

    Remember that this Metaclass has a MetaParams (from metabase) as root class and therefore "params" defined for the class have been removed from kwargs at an earlier state

Class has already been created … register subclasses

Expand source code
class MetaAbstractDataBase(dataseries.OHLCDateTime.__class__):
    _indcol = dict()

    def __init__(cls, name, bases, dct):
        '''
        Class has already been created ... register subclasses
        '''
        # Initialize the class
        super(MetaAbstractDataBase, cls).__init__(name, bases, dct)

        if not cls.aliased and \
           name != 'DataBase' and not name.startswith('_'):
            cls._indcol[name] = cls

    def dopreinit(cls, _obj, *args, **kwargs):
        _obj, args, kwargs = \
            super(MetaAbstractDataBase, cls).dopreinit(_obj, *args, **kwargs)

        # Find the owner and store it
        _obj._feed = metabase.findowner(_obj, FeedBase)

        _obj.notifs = collections.deque()  # store notifications for cerebro

        _obj._dataname = _obj.p.dataname
        _obj._name = ''
        return _obj, args, kwargs

    def dopostinit(cls, _obj, *args, **kwargs):
        _obj, args, kwargs = \
            super(MetaAbstractDataBase, cls).dopostinit(_obj, *args, **kwargs)

        # Either set by subclass or the parameter or use the dataname (ticker)
        _obj._name = _obj._name or _obj.p.name
        if not _obj._name and isinstance(_obj.p.dataname, string_types):
            _obj._name = _obj.p.dataname
        _obj._compression = _obj.p.compression
        _obj._timeframe = _obj.p.timeframe

        if isinstance(_obj.p.sessionstart, datetime.datetime):
            _obj.p.sessionstart = _obj.p.sessionstart.time()

        elif _obj.p.sessionstart is None:
            _obj.p.sessionstart = datetime.time.min

        if isinstance(_obj.p.sessionend, datetime.datetime):
            _obj.p.sessionend = _obj.p.sessionend.time()

        elif _obj.p.sessionend is None:
            # remove 9 to avoid precision rounding errors
            _obj.p.sessionend = datetime.time(23, 59, 59, 999990)

        if isinstance(_obj.p.fromdate, datetime.date):
            # push it to the end of the day, or else intraday
            # values before the end of the day would be gone
            if not hasattr(_obj.p.fromdate, 'hour'):
                _obj.p.fromdate = datetime.datetime.combine(
                    _obj.p.fromdate, _obj.p.sessionstart)

        if isinstance(_obj.p.todate, datetime.date):
            # push it to the end of the day, or else intraday
            # values before the end of the day would be gone
            if not hasattr(_obj.p.todate, 'hour'):
                _obj.p.todate = datetime.datetime.combine(
                    _obj.p.todate, _obj.p.sessionend)

        _obj._barstack = collections.deque()  # for filter operations
        _obj._barstash = collections.deque()  # for filter operations

        _obj._filters = list()
        _obj._ffilters = list()
        for fp in _obj.p.filters:
            if inspect.isclass(fp):
                fp = fp(_obj)
                if hasattr(fp, 'last'):
                    _obj._ffilters.append((fp, [], {}))

            _obj._filters.append((fp, [], {}))

        return _obj, args, kwargs

Ancestors

Subclasses

Methods

def dopostinit(cls, _obj, *args, **kwargs)
Expand source code
def dopostinit(cls, _obj, *args, **kwargs):
    _obj, args, kwargs = \
        super(MetaAbstractDataBase, cls).dopostinit(_obj, *args, **kwargs)

    # Either set by subclass or the parameter or use the dataname (ticker)
    _obj._name = _obj._name or _obj.p.name
    if not _obj._name and isinstance(_obj.p.dataname, string_types):
        _obj._name = _obj.p.dataname
    _obj._compression = _obj.p.compression
    _obj._timeframe = _obj.p.timeframe

    if isinstance(_obj.p.sessionstart, datetime.datetime):
        _obj.p.sessionstart = _obj.p.sessionstart.time()

    elif _obj.p.sessionstart is None:
        _obj.p.sessionstart = datetime.time.min

    if isinstance(_obj.p.sessionend, datetime.datetime):
        _obj.p.sessionend = _obj.p.sessionend.time()

    elif _obj.p.sessionend is None:
        # remove 9 to avoid precision rounding errors
        _obj.p.sessionend = datetime.time(23, 59, 59, 999990)

    if isinstance(_obj.p.fromdate, datetime.date):
        # push it to the end of the day, or else intraday
        # values before the end of the day would be gone
        if not hasattr(_obj.p.fromdate, 'hour'):
            _obj.p.fromdate = datetime.datetime.combine(
                _obj.p.fromdate, _obj.p.sessionstart)

    if isinstance(_obj.p.todate, datetime.date):
        # push it to the end of the day, or else intraday
        # values before the end of the day would be gone
        if not hasattr(_obj.p.todate, 'hour'):
            _obj.p.todate = datetime.datetime.combine(
                _obj.p.todate, _obj.p.sessionend)

    _obj._barstack = collections.deque()  # for filter operations
    _obj._barstash = collections.deque()  # for filter operations

    _obj._filters = list()
    _obj._ffilters = list()
    for fp in _obj.p.filters:
        if inspect.isclass(fp):
            fp = fp(_obj)
            if hasattr(fp, 'last'):
                _obj._ffilters.append((fp, [], {}))

        _obj._filters.append((fp, [], {}))

    return _obj, args, kwargs
def dopreinit(cls, _obj, *args, **kwargs)
Expand source code
def dopreinit(cls, _obj, *args, **kwargs):
    _obj, args, kwargs = \
        super(MetaAbstractDataBase, cls).dopreinit(_obj, *args, **kwargs)

    # Find the owner and store it
    _obj._feed = metabase.findowner(_obj, FeedBase)

    _obj.notifs = collections.deque()  # store notifications for cerebro

    _obj._dataname = _obj.p.dataname
    _obj._name = ''
    return _obj, args, kwargs

Inherited members

class MetaCSVDataBase (name, bases, dct)

Dirty job manager for a LineSeries

  • During new (class creation), it reads "lines", "plotinfo", "plotlines" class variable definitions and turns them into Classes of type Lines or AutoClassInfo (plotinfo/plotlines)

  • During "new" (instance creation) the lines/plotinfo/plotlines classes are substituted in the instance with instances of the aforementioned classes and aliases are added for the "lines" held in the "lines" instance

    Additionally and for remaining kwargs, these are matched against args in plotinfo and if existent are set there and removed from kwargs

    Remember that this Metaclass has a MetaParams (from metabase) as root class and therefore "params" defined for the class have been removed from kwargs at an earlier state

Class has already been created … register subclasses

Expand source code
class MetaCSVDataBase(DataBase.__class__):
    def dopostinit(cls, _obj, *args, **kwargs):
        # Before going to the base class to make sure it overrides the default
        if not _obj.p.name and not _obj._name:
            _obj._name, _ = os.path.splitext(os.path.basename(_obj.p.dataname))

        _obj, args, kwargs = \
            super(MetaCSVDataBase, cls).dopostinit(_obj, *args, **kwargs)

        return _obj, args, kwargs

Ancestors

Methods

def dopostinit(cls, _obj, *args, **kwargs)
Expand source code
def dopostinit(cls, _obj, *args, **kwargs):
    # Before going to the base class to make sure it overrides the default
    if not _obj.p.name and not _obj._name:
        _obj._name, _ = os.path.splitext(os.path.basename(_obj.p.dataname))

    _obj, args, kwargs = \
        super(MetaCSVDataBase, cls).dopostinit(_obj, *args, **kwargs)

    return _obj, args, kwargs

Inherited members