Source code for qf_lib.data_providers.helpers

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.
import warnings
from datetime import datetime
from typing import Union, Dict, Sequence, Any
import pandas as pd
from pandas import DatetimeIndex
from xarray import DataArray

from qf_lib.common.tickers.tickers import Ticker
from qf_lib.common.utils.dateutils.relative_delta import RelativeDelta
from qf_lib.common.utils.miscellaneous.to_list_conversion import convert_to_list
from qf_lib.containers.dataframe.cast_dataframe import cast_dataframe
from qf_lib.containers.dataframe.prices_dataframe import PricesDataFrame
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.dimension_names import DATES, TICKERS, FIELDS
from qf_lib.containers.futures.future_tickers.future_ticker import FutureTicker
from qf_lib.containers.qf_data_array import QFDataArray
from qf_lib.containers.series.cast_series import cast_series
from qf_lib.containers.series.prices_series import PricesSeries
from qf_lib.containers.series.qf_series import QFSeries

[docs]def normalize_data_array( data_array, tickers, fields, got_single_date, got_single_ticker, got_single_field, use_prices_types=False) \ -> Union[QFSeries, QFDataFrame, QFDataArray, PricesSeries, PricesDataFrame]: """ Post-processes the result of some DataProviders so that it satisfies the format of a result expected from DataProviders. Expected format rules should cover the following: - proper return type (QFSeries/PricesSeries, QFDataFrame/PricesDataFrame, QFDataArray), - proper shape of the result (squeezed dimensions for which a single non-list value was provided, e.g. "OPEN"), - dimensions: TICKERS and FIELDS contain all required labels and the labels are in required order. Parameters ---------- data_array data_array to be normalized tickers list of tickers requested by the caller fields list of fields requested by the caller got_single_date True if a single (scalar value) date was requested (start_date==end_date); False otherwise got_single_ticker True if a single (scalar value) ticker was requested (e.g. "MSFT US Equity"); False otherwise got_single_field True if a single (scalar value) field was requested (e.g. "OPEN"); False otherwise use_prices_types if True then proper return types are: PricesSeries, PricesDataFrame or QFDataArray; otherwise return types are: QFSeries, QFDataFrame or QFDataArray Returns -------- QFSeries, QFDataFrame, QFDataArray, PricesSeries, PricesDataFrame """ # to keep the order of tickers and fields we reindex the data_array if data_array.tickers.values.tolist() != tickers: data_array = data_array.reindex(tickers=tickers) if data_array.fields.values.tolist() != fields: data_array = data_array.reindex(fields=fields) data_array = data_array.dropna(DATES, how='all') squeezed_and_casted_result = squeeze_data_array_and_cast_to_proper_type(data_array, got_single_date, got_single_ticker, got_single_field, use_prices_types) return squeezed_and_casted_result
def squeeze_data_array_and_cast_to_proper_type(original_data_array: QFDataArray, got_single_date: bool, got_single_ticker: bool, got_single_field: bool, use_prices_types: bool): if isinstance(original_data_array, DataArray) and not isinstance(original_data_array, QFDataArray): warnings.warn("data_array to be normalized should be a QFDataFrame instance. " "Transforming data_array to QFDataArray. Please check types in the future.") original_data_array = QFDataArray.from_xr_data_array(original_data_array) dimensions_to_squeeze = [] if got_single_date: dimensions_to_squeeze.append(DATES) if got_single_ticker: dimensions_to_squeeze.append(TICKERS) if got_single_field: dimensions_to_squeeze.append(FIELDS) container = original_data_array if dimensions_to_squeeze: if original_data_array.size == 0: # empty container = QFDataFrame(index=original_data_array[TICKERS].values, columns=original_data_array[FIELDS].values) = TICKERS = FIELDS if use_prices_types: container = PricesDataFrame(container) if got_single_field: container = container.squeeze(axis=1) = original_data_array[FIELDS].values[0] if got_single_ticker: container = container.squeeze(axis=0) if not got_single_date: dates = DatetimeIndex([], name=DATES) if got_single_ticker and got_single_field: container = QFSeries(index=DatetimeIndex([], name=DATES)) if use_prices_types: container = PricesSeries(container) if not got_single_ticker or not got_single_field: container = container.to_frame().T.reindex(dates) = DATES else: container = original_data_array.squeeze(dimensions_to_squeeze) if len(dimensions_to_squeeze) < 3: if got_single_ticker: ticker = original_data_array.tickers[0].item() = ticker.as_string() elif got_single_field: = original_data_array.fields[0].item() if isinstance(container, QFDataArray): container = cast_data_array_to_proper_type(container, use_prices_types) return container def cast_data_array_to_proper_type(result: QFDataArray, use_prices_types=False): if use_prices_types: series_type = PricesSeries data_frame_type = PricesDataFrame else: series_type = QFSeries data_frame_type = QFDataFrame num_of_dimensions = len(result.shape) if num_of_dimensions == 0: casted_result = result.item() elif num_of_dimensions == 1: casted_result = cast_series(result.to_pandas(), series_type) = elif num_of_dimensions == 2: casted_result = cast_dataframe(result.to_pandas(), data_frame_type) else: casted_result = result return casted_result def cast_dataframe_to_proper_type(result): num_of_dimensions = len(result.axes) if num_of_dimensions == 1: casted_result = cast_series(result, QFSeries) elif num_of_dimensions == 2: casted_result = cast_dataframe(result, QFDataFrame) else: casted_result = result return casted_result
[docs]def tickers_dict_to_data_array(tickers_data_dict: Dict[Ticker, QFDataFrame], requested_tickers: Union[Ticker, Sequence[Ticker]], requested_fields: Union[Any, Sequence[Any]]) -> QFDataArray: """ Converts a dictionary mapping tickers to DateFrame onto a QFDataArray, by applying a filter on the tickers and fields that are needed Parameters ---------- tickers_data_dict: Dict[Ticker, QFDataFrame] Ticker -> QFDataFrame[dates, fields] requested_tickers: Sequence[Ticker] Filter the data dict based on a list of tickers requested_fields Filter the data dict based on a list of fields Returns ------- QFDataArray """ # return empty xr.DataArray if there is no data to be converted requested_tickers, _ = convert_to_list(requested_tickers, Ticker) if not isinstance(requested_fields, Sequence) or isinstance(requested_fields, str): requested_fields, _ = convert_to_list(requested_fields, type(requested_fields)) if not tickers_data_dict: return QFDataArray.create(dates=[], tickers=requested_tickers, fields=requested_fields) tickers = [] data_arrays = [] for ticker, df in tickers_data_dict.items(): = DATES if df.empty: # if there is no data for a given ticker, skip it (proper column will be added afterwards anyway) continue data_array = df.to_xarray() data_array = data_array.to_array(dim=FIELDS, name=ticker) data_array = data_array.transpose(DATES, FIELDS) tickers.append(ticker) data_arrays.append(data_array) if not data_arrays: return QFDataArray.create(dates=[], tickers=requested_tickers, fields=requested_fields) tickers_index = pd.Index(tickers, name=TICKERS) result = QFDataArray.concat(data_arrays, dim=tickers_index) result = result.reindex(tickers=requested_tickers, fields=requested_fields) # the DataArray gets a name after the first ticker in the tickers_data_dict.keys() which is incorrect; # it should have no name = None return result
def get_fields_from_tickers_data_dict(tickers_data_dict): fields = set() for dates_fields_df in tickers_data_dict.values(): fields.update(dates_fields_df.columns.values) fields = list(fields) return fields
[docs]def chain_tickers_within_range(future_ticker: FutureTicker, exp_dates: QFDataFrame, start_date: datetime, end_date: datetime): """ Returns only these tickers belonging to the chain of a given FutureTicker, which were valid only for the given time frame. As it is possible to select the contracts to be traded for a given future ticker (e.g. for Bloomberg future tickers we could specify only to trade "M" contracts), the end date is computed as the original end date + 1 year x contract number to trade. E.g. if we specify that we only want to trade "M" contracts and we always want to trade the front M contract, we add 1 year x 1. If instead of the front M, we would like to trade the second next M contract, we add 2 years to the end date etc. """ exp_dates = exp_dates[exp_dates >= start_date].dropna() exp_dates = exp_dates[exp_dates <= end_date + RelativeDelta(years=future_ticker.N)].dropna() return exp_dates.index.tolist()