Source code for qf_lib.data_providers.preset_data_provider

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

from datetime import datetime
from typing import Union, Sequence, Any, Set, Type, Dict, FrozenSet, Optional, Tuple
import pandas as pd
from numpy import nan, ceil
from pandas._libs.tslibs import to_offset

from qf_lib.common.enums.expiration_date_field import ExpirationDateField
from qf_lib.common.enums.frequency import Frequency
from qf_lib.common.enums.price_field import PriceField
from qf_lib.common.tickers.tickers import Ticker
from qf_lib.common.utils.dateutils.relative_delta import RelativeDelta
from qf_lib.common.utils.miscellaneous.to_list_conversion import convert_to_list
from qf_lib.containers.dataframe.prices_dataframe import PricesDataFrame
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.dimension_names import DATES, FIELDS, TICKERS
from qf_lib.containers.futures.future_tickers.future_ticker import FutureTicker
from qf_lib.containers.qf_data_array import QFDataArray
from qf_lib.containers.series.prices_series import PricesSeries
from qf_lib.containers.series.qf_series import QFSeries
from qf_lib.data_providers.helpers import normalize_data_array
from qf_lib.data_providers.data_provider import DataProvider


[docs]class PresetDataProvider(DataProvider): """ Wrapper on QFDataArray which makes it a DataProvider. Parameters ---------- data data to be wrapped, indexed by date, (specific) tickers and fields start_date beginning of the cached period (not necessarily the first date in the `data`) end_date end of the cached period (not necessarily the last date in the `data`) frequency frequency of the data exp_dates dictionary mapping FutureTickers to QFDataFrame of contracts expiration dates, belonging to the certain future ticker family """ def __init__(self, data: QFDataArray, start_date: datetime, end_date: datetime, frequency: Frequency, exp_dates: Dict[FutureTicker, QFDataFrame] = None): super().__init__() self._data_bundle = data self._frequency = frequency self._exp_dates = exp_dates self._tickers_cached_set = frozenset(data.tickers.values) self._future_tickers_cached_set = frozenset(exp_dates.keys()) if exp_dates is not None else None self._fields_cached_set = frozenset(data.fields.values) self._start_date = start_date self._end_date = end_date self._ticker_types = {type(ticker) for ticker in data.tickers.values} @property def data_bundle(self) -> QFDataArray: return self._data_bundle @property def frequency(self) -> Frequency: return self._frequency @property def exp_dates(self) -> Dict[FutureTicker, QFDataFrame]: return self._exp_dates @property def cached_tickers(self) -> FrozenSet[Ticker]: return self._tickers_cached_set @property def cached_future_tickers(self) -> FrozenSet[FutureTicker]: return self._future_tickers_cached_set @property def cached_fields(self) -> FrozenSet[Union[str, PriceField]]: return self._fields_cached_set @property def start_date(self) -> datetime: return self._start_date @property def end_date(self) -> datetime: return self._end_date
[docs] def supported_ticker_types(self) -> Set[Type[Ticker]]: return self._ticker_types
[docs] def get_price(self, tickers: Union[Ticker, Sequence[Ticker]], fields: Union[PriceField, Sequence[PriceField]], start_date: datetime, end_date: datetime = None, frequency: Frequency = Frequency.DAILY) -> \ Union[None, PricesSeries, PricesDataFrame, QFDataArray]: # The passed desired data frequency should be at most equal to the frequency of the initially loaded data # (in case of downsampling the data may be aggregated, but no data upsampling is supported). assert frequency <= self._frequency, "The passed data frequency should be at most equal to the frequency of " \ "the initially loaded data" # The PresetDataProvider does not support data aggregation for frequency lower than daily frequency if frequency < self._frequency and frequency <= Frequency.DAILY: self.logger.warning("aggregating intraday data to frequency Daily or lower is based on the time of " "underlying intrady data and might not be identical to getting daily data form the " "data provider.") start_date = self._adjust_start_date(start_date, frequency) end_date = self._adjust_end_date(end_date) tickers, specific_tickers, tickers_mapping, got_single_ticker = self._tickers_mapping(tickers) fields, got_single_field = convert_to_list(fields, PriceField) got_single_date = self._got_single_date(start_date, end_date, frequency) self._check_if_cached_data_available(specific_tickers, fields, start_date, end_date) data_array = self._data_bundle.loc[start_date:end_date, specific_tickers, fields] # Data aggregation (allowed only for the Intraday Data and in case if more then 1 data point is found) if frequency < self._frequency and len(data_array[DATES]) > 0: data_array = self._aggregate_intraday_data(data_array, start_date, end_date, fields, frequency) normalized_result = normalize_data_array( data_array, specific_tickers, fields, got_single_date, got_single_ticker, got_single_field, use_prices_types=True ) normalized_result = self._map_normalized_result(normalized_result, tickers_mapping, tickers) return normalized_result
[docs] def historical_price(self, tickers: Union[Ticker, Sequence[Ticker]], fields: Union[PriceField, Sequence[PriceField]], nr_of_bars: int, end_date: Optional[datetime] = None, frequency: Frequency = None) -> Union[PricesSeries, PricesDataFrame, QFDataArray]: assert frequency >= Frequency.DAILY, "Frequency lower than daily is not supported by the Data Provider" assert nr_of_bars > 0, "Numbers of data samples should be a positive integer" end_date = datetime.now() if end_date is None else end_date tickers, specific_tickers, tickers_mapping, got_single_ticker = self._tickers_mapping(tickers) fields, got_single_field = convert_to_list(fields, PriceField) got_single_date = nr_of_bars == 1 data_bundle = self._data_bundle # Data aggregation (allowed only for the Intraday Data and in case if more then 1 data point is found) if frequency < self._frequency and len(data_bundle[DATES]) > 0: data_bundle = self._aggregate_intraday_data_helper(data_bundle, frequency, fields) if frequency == Frequency.DAILY: start_date = self._compute_start_date(nr_of_bars, end_date, frequency) else: start_date = end_date - RelativeDelta(days=14) # to optimize running time for intraday data data_array = data_bundle.loc[start_date:end_date, specific_tickers, fields].dropna(DATES, how='all') data_array = data_array.isel(dates=slice(-nr_of_bars, None)) missing_bars = nr_of_bars - data_array.shape[0] if missing_bars > 0: start_date = self._compute_start_date(missing_bars, start_date, frequency) data_array = data_bundle.loc[start_date:end_date, specific_tickers, fields].dropna(DATES, how='all') data_array = data_array.isel(dates=slice(-nr_of_bars, None)) normalized_result = normalize_data_array( data_array, specific_tickers, fields, got_single_date, got_single_ticker, got_single_field, use_prices_types=True ) normalized_result = self._map_normalized_result(normalized_result, tickers_mapping, tickers) num_of_dates_available = normalized_result.shape[0] if num_of_dates_available < nr_of_bars: if isinstance(tickers, Ticker): tickers_as_strings = tickers.as_string() else: tickers_as_strings = ", ".join(ticker.as_string() for ticker in tickers) raise ValueError(f"Not enough data points for \ntickers: {tickers_as_strings} \ndate: {end_date}." f"\n{nr_of_bars} Data points requested, \n{num_of_dates_available} Data points available.") return normalized_result
[docs] def get_last_available_price(self, tickers: Union[Ticker, Sequence[Ticker]], frequency: Frequency, end_time: Optional[datetime] = None) -> Union[float, PricesSeries]: end_time = datetime.now() if end_time is None else end_time end_time += RelativeDelta(second=0, microsecond=0) assert frequency >= Frequency.DAILY, "Frequency lower then daily is not supported by the " \ "get_last_available_price function" tickers, specific_tickers, tickers_mapping, got_single_ticker = self._tickers_mapping(tickers) if not tickers: return nan if got_single_ticker else PricesSeries() start_time = end_time - RelativeDelta(days=7) # 7 days to know if an asset disappears data_array = self._data_bundle.loc[start_time:end_time, specific_tickers, [PriceField.Open, PriceField.Close]] # Data aggregation (allowed only for the Intraday Data and in case if more then 1 data point is found) if frequency < self._frequency and len(data_array[DATES]) > 0: data_array = self._aggregate_intraday_data_helper(data_array, frequency, [PriceField.Open, PriceField.Close]) # Get the Close price of latest bar if available for all the tickers last_close = data_array.isel(dates=slice(-1, None)).loc[:, :, [PriceField.Close]] if not last_close.isnull().any(): normalized_result = normalize_data_array(last_close, specific_tickers, [PriceField.Close], got_single_date=True, got_single_ticker=got_single_ticker, got_single_field=True, use_prices_types=True) normalized_result = self._map_normalized_result(normalized_result, tickers_mapping, tickers) return normalized_result open_data_array = data_array.loc[:, :, [PriceField.Open]] open_prices = normalize_data_array(open_data_array, specific_tickers, [PriceField.Open], got_single_date=False, got_single_ticker=False, got_single_field=True, use_prices_types=True) close_data_array = data_array.loc[:, :, [PriceField.Close]] close_prices = normalize_data_array(close_data_array, specific_tickers, [PriceField.Close], got_single_date=False, got_single_ticker=False, got_single_field=True, use_prices_types=True) first_indices = [df.first_valid_index().to_pydatetime() for df in [open_prices, close_prices] if df.first_valid_index() is not None] if not first_indices: return nan if got_single_ticker else PricesSeries(index=tickers) start_date = min(first_indices) latest_available_prices_series = self._get_valid_latest_available_prices(start_date, specific_tickers, open_prices, close_prices) latest_available_prices_series = self._map_normalized_result(latest_available_prices_series, tickers_mapping, tickers) return latest_available_prices_series.iloc[0] if got_single_ticker else latest_available_prices_series
def _tickers_mapping(self, tickers: Union[Ticker, Sequence[Ticker]]) -> \ Tuple[Sequence[Ticker], Sequence[Ticker], Dict, bool]: """ In order to be able to return data for FutureTickers create a mapping between tickers and corresponding specific tickers (in case of non FutureTickers it will be an identity mapping) """ tickers, got_single_ticker = convert_to_list(tickers, Ticker) tickers_mapping = {(t.get_current_specific_ticker() if isinstance(t, FutureTicker) else t): t for t in tickers} specific_tickers = list(tickers_mapping.keys()) return tickers, specific_tickers, tickers_mapping, got_single_ticker def _check_if_cached_data_available(self, tickers, fields, start_date, end_date): uncached_tickers = set(tickers) - self._tickers_cached_set if uncached_tickers: tickers_str = [t.as_string() for t in uncached_tickers] raise ValueError("Tickers: {} are not available in the Data Bundle".format(tickers_str)) # fields which are not cached but were requested uncached_fields = set(fields) - self._fields_cached_set if uncached_fields: raise ValueError("Fields: {} are not available in the Data Bundle".format(uncached_fields)) def remove_time_part(date: datetime): return datetime(date.year, date.month, date.day) start_date_not_included = start_date < self._start_date if self._frequency > Frequency.DAILY else \ remove_time_part(start_date) < remove_time_part(self._start_date) if start_date_not_included: raise ValueError("Requested start date {} is before data bundle start date {}". format(start_date, self._start_date)) end_date_not_included = end_date > self._end_date if self._frequency > Frequency.DAILY else \ remove_time_part(end_date) > remove_time_part(self._end_date) if end_date_not_included: raise ValueError("Requested end date {} is after data bundle end date {}". format(end_date, self._end_date))
[docs] def get_history(self, tickers: Union[Ticker, Sequence[Ticker]], fields: Union[Any, Sequence[Any]], start_date: datetime, end_date: datetime = None, frequency: Frequency = Frequency.DAILY, **kwargs ) -> Union[QFSeries, QFDataFrame, QFDataArray]: # Verify whether the passed frequency parameter is correct and can be used with the preset data assert frequency == self._frequency, "Currently, for the get history does not support data sampling" start_date = self._adjust_start_date(start_date, frequency) end_date = self._adjust_end_date(end_date) # In order to be able to return data for FutureTickers create a mapping between tickers and corresponding # specific tickers (in case of non FutureTickers it will be an identity mapping) tickers, got_single_ticker = convert_to_list(tickers, Ticker) tickers_mapping = { (t.get_current_specific_ticker() if isinstance(t, FutureTicker) else t): t for t in tickers } specific_tickers = list(tickers_mapping.keys()) fields, got_single_field = convert_to_list(fields, type(fields) if not isinstance(fields, Sequence) else str) got_single_date = self._got_single_date(start_date, end_date, frequency) self._check_if_cached_data_available(specific_tickers, fields, start_date, end_date) data_array = self._data_bundle.loc[start_date:end_date, specific_tickers, fields] normalized_result = normalize_data_array(data_array, specific_tickers, fields, got_single_date, got_single_ticker, got_single_field, use_prices_types=False) normalized_result = self._map_normalized_result(normalized_result, tickers_mapping, tickers) return normalized_result
[docs] def get_futures_chain_tickers(self, tickers: Union[FutureTicker, Sequence[FutureTicker]], expiration_date_fields: Union[ExpirationDateField, Sequence[ExpirationDateField]]) \ -> Dict[FutureTicker, Union[QFSeries, QFDataFrame]]: tickers, got_single_ticker = convert_to_list(tickers, Ticker) # Check if the futures tickers are in the exp_dates keys uncached_future_tickers = set(tickers) - set(self._exp_dates.keys()) if uncached_future_tickers: tickers_str = [t.name for t in tickers] raise ValueError("Tickers: {} are not available in the Data Bundle".format(tickers_str)) future_chain_tickers = { ticker: self._exp_dates[ticker] for ticker in tickers } return future_chain_tickers
def _map_normalized_result(self, normalized_result, tickers_mapping, tickers): # Map the specific tickers onto the tickers given by the tickers_mapping array if isinstance(normalized_result, QFDataArray): normalized_result = normalized_result.assign_coords( tickers=[tickers_mapping[t] for t in normalized_result.tickers.values]) elif isinstance(normalized_result, PricesDataFrame): normalized_result = normalized_result.rename(columns=tickers_mapping) elif isinstance(normalized_result, PricesSeries): # Name of the PricesSeries can only contain strings if len(tickers) == 1: ticker = tickers[0] normalized_result = normalized_result.rename(ticker.name) else: normalized_result = normalized_result.rename(tickers_mapping) return normalized_result def _aggregate_intraday_data_helper(self, data_array, frequency: Frequency, fields: Sequence[PriceField]): # Data aggregation (allowed only for the Intraday Data and in case if more then 1 data point is found) expected_nr_of_bars = int(ceil( to_offset(frequency.to_pandas_freq()) / to_offset(self._frequency.to_pandas_freq()))) data_array = data_array.isel(dates=slice(-expected_nr_of_bars, None)) start_date = data_array[DATES][0].values end_date = data_array[DATES][-1].values data_array = self._aggregate_intraday_data(data_array, start_date, end_date, fields, frequency) return data_array def _aggregate_intraday_data(self, data_array, start_date: datetime, end_date: datetime, fields, frequency: Frequency): """ Function, which aggregates the intraday data array for various dates and returns a new data array with data sampled with the given frequency. """ prices_list = [] for field in fields: prices = data_array.loc[start_date:end_date, :, field].to_series().groupby( [pd.Grouper(freq=frequency.to_pandas_freq(), level=0, label="left", origin="start_day"), pd.Grouper(level=1)]) if field is PriceField.Open: prices = prices.first() elif field is PriceField.Close: prices = prices.last() elif field is PriceField.Low: prices = prices.min() elif field is PriceField.High: prices = prices.max() elif field is PriceField.Volume: prices = prices.sum() else: raise NotImplementedError(f"Unknown price field passed to the PresetDataProvider: {field}") prices = pd.concat({field: prices}, names=[FIELDS]) prices_list.append(prices) data_array = QFDataArray.from_xr_data_array( pd.concat(prices_list).reorder_levels([DATES, TICKERS, FIELDS]).to_xarray()) return data_array @staticmethod def _adjust_end_date(end_date: Optional[datetime]) -> datetime: end_date = end_date or datetime.now() return end_date + RelativeDelta(second=0, microsecond=0)