Source code for qf_lib.data_providers.abstract_price_data_provider

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

from abc import abstractmethod, ABCMeta
from datetime import datetime
from typing import Union, Sequence, Dict

from qf_lib.common.enums.expiration_date_field import ExpirationDateField
from qf_lib.common.enums.frequency import Frequency
from qf_lib.common.enums.price_field import PriceField
from qf_lib.common.tickers.tickers import Ticker
from qf_lib.common.utils.dateutils.relative_delta import RelativeDelta
from qf_lib.common.utils.miscellaneous.to_list_conversion import convert_to_list
from qf_lib.containers.dataframe.prices_dataframe import PricesDataFrame
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.futures.future_tickers.future_ticker import FutureTicker
from qf_lib.containers.qf_data_array import QFDataArray
from qf_lib.containers.series.prices_series import PricesSeries
from qf_lib.data_providers.data_provider import DataProvider
from qf_lib.data_providers.helpers import normalize_data_array


[docs]class AbstractPriceDataProvider(DataProvider, metaclass=ABCMeta): """ Interface for data providers that supply historical data for various asset classes, including stocks, indices, and futures. This base class is designed for simple data providers, which are linked to a single data source (e.g., Quandl, Bloomberg, Yahoo). It defines the standard structure and methods that any specific data provider implementation must adhere to in order to access and retrieve historical market data. Notes: - When implementing the get_history method (which drivers a large portion of backtesting capabilities) careful consideration must be taken to ensure the data is returned in the expected format depending on the specific input tickers and fields. For Example: - isinstance(tickers, str) and isinstance(fields, str) it is expected to return a PriceSeries object - isinstance(tickers, str) and isinstance(fields, list) it is expected to return a PricesDataframe object - otherwise it is expected to return a QFDataArray object """
[docs] def get_price(self, tickers: Union[Ticker, Sequence[Ticker]], fields: Union[PriceField, Sequence[PriceField]], start_date: datetime, end_date: datetime = None, frequency: Frequency = Frequency.DAILY, **kwargs) -> \ Union[None, PricesSeries, PricesDataFrame, QFDataArray]: end_date = end_date or datetime.now() end_date = end_date + RelativeDelta(second=0, microsecond=0) start_date = self._adjust_start_date(start_date, frequency) got_single_date = self._got_single_date(start_date, end_date, frequency) tickers, got_single_ticker = convert_to_list(tickers, Ticker) fields, got_single_field = convert_to_list(fields, PriceField) fields_str = self._map_field_to_str(fields) container = self.get_history(tickers, fields_str, start_date, end_date, frequency, **kwargs) str_to_field_dict = self.str_to_price_field_map() # Map the specific fields onto the fields given by the str_to_field_dict if isinstance(container, QFDataArray): container = container.assign_coords(fields=[str_to_field_dict[field_str] for field_str in container.fields.values]) normalized_result = normalize_data_array( container, tickers, fields, got_single_date, got_single_ticker, got_single_field, use_prices_types=True ) else: normalized_result = PricesDataFrame(container).rename(columns=str_to_field_dict) if got_single_field: normalized_result = normalized_result.squeeze(axis=1) if got_single_ticker: normalized_result = normalized_result.squeeze(axis=0) return normalized_result
[docs] @abstractmethod def price_field_to_str_map(self) -> Dict[PriceField, str]: """ Method has to be implemented in each data provider in order to be able to use get_price. Returns dictionary containing mapping between PriceField and corresponding string that has to be used by get_history method to get appropriate type of price series. Returns ------- Dict[PriceField, str] mapping between PriceFields and corresponding strings """ raise NotImplementedError()
[docs] @abstractmethod def expiration_date_field_str_map(self, ticker: Ticker = None) -> Dict[ExpirationDateField, str]: """ Method has to be implemented in each data provider in order to be able to use get_futures_chain_tickers. Returns dictionary containing mapping between ExpirationDateField and corresponding string that has to be used by get_futures_chain_tickers method. Parameters ----------- ticker: None, Ticker ticker is optional and might be uses by particular data providers to create appropriate dictionary Returns ------- Dict[ExpirationDateField, str] mapping between ExpirationDateField and corresponding strings """ pass
[docs] def get_futures_chain_tickers(self, tickers: Union[FutureTicker, Sequence[FutureTicker]], expiration_date_fields: Union[ExpirationDateField, Sequence[ExpirationDateField]]) \ -> Dict[FutureTicker, QFDataFrame]: expiration_date_fields, got_single_expiration_date_field = convert_to_list(expiration_date_fields, ExpirationDateField) mapping_dict = self.expiration_date_field_str_map() expiration_date_fields_str = [mapping_dict[field] for field in expiration_date_fields] exp_dates_dict = self._get_futures_chain_dict(tickers, expiration_date_fields_str) for future_ticker, exp_dates in exp_dates_dict.items(): exp_dates = exp_dates.rename(columns=self.str_to_expiration_date_field_map()) for ticker in exp_dates.index: ticker.security_type = future_ticker.security_type ticker.point_value = future_ticker.point_value ticker.set_name(future_ticker.name) if got_single_expiration_date_field: exp_dates = exp_dates.squeeze() exp_dates_dict[future_ticker] = exp_dates return exp_dates_dict
@abstractmethod def _get_futures_chain_dict(self, tickers: Union[FutureTicker, Sequence[FutureTicker]], expiration_date_fields: Union[str, Sequence[str]]) -> Dict[FutureTicker, QFDataFrame]: """ Returns a dictionary, which maps Tickers to QFSeries, consisting of the expiration dates of Future Contracts: Dict[FutureTicker, Union[QFSeries, QFDataFrame]]]. Parameters ---------- tickers: Ticker, Sequence[Ticker] tickers for securities which should be retrieved expiration_date_fields: str, Sequence[str] expiration date fields of securities which should be retrieved. Specific for each data provider, the mapping between strings and corresponding ExpirationDateField enum values should be implemented as str_to_expiration_date_field_map function. """ pass
[docs] def str_to_expiration_date_field_map(self, ticker: Ticker = None) -> Dict[str, ExpirationDateField]: """ Inverse of str_to_expiration_date_field_map. """ field_str_dict = self.expiration_date_field_str_map(ticker) inv_dict = {v: k for k, v in field_str_dict.items()} return inv_dict
[docs] def str_to_price_field_map(self) -> Dict[str, PriceField]: """ Inverse of price_field_to_str_map. """ field_str_dict = self.price_field_to_str_map() inv_dict = {v: k for k, v in field_str_dict.items()} return inv_dict
def _map_field_to_str( self, fields: Union[None, PriceField, Sequence[PriceField]]) \ -> Union[None, str, Sequence[str]]: """ The method maps enum to sting that is recognised by the specific database. Parameters ---------- fields fields of securities which should be retrieved Returns ------- str, Sequence[str] String representation of the field or fields that corresponds the API provided by a specific data provider Depending on the input it returns: None -> None PriceField -> str Sequence[PriceField] -> List[str] """ if fields is None: return None field_str_dict = self.price_field_to_str_map() if self._is_single_price_field(fields): if fields in field_str_dict: return field_str_dict[fields] else: raise LookupError("Field {} is not recognised by the data provider. Available Fields: {}" .format(fields, list(field_str_dict.keys()))) result = [] for field in fields: if field in field_str_dict: result.append(field_str_dict[field]) else: raise LookupError("Field {} is not recognised by the data provider. Available Fields: {}" .format(fields, list(field_str_dict.keys()))) return result @staticmethod def _is_single_price_field(fields: Union[None, PriceField, Sequence[PriceField]]): return fields is not None and isinstance(fields, PriceField) @staticmethod def _is_single_ticker(value): if isinstance(value, Ticker): return True return False def _get_first_ticker(self, tickers): if self._is_single_ticker(tickers): ticker = tickers else: ticker = tickers[0] return ticker