Source code for qf_lib.containers.dataframe.qf_dataframe

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

from datetime import datetime
from typing import Sized, Sequence, Callable, Union, Mapping, Dict

import numpy as np
import pandas as pd

from qf_lib.common.enums.frequency import Frequency
from qf_lib.containers.dataframe.cast_dataframe import cast_dataframe
from qf_lib.containers.series.cast_series import cast_series
from qf_lib.containers.time_indexed_container import TimeIndexedContainer


[docs]class QFDataFrame(pd.DataFrame, TimeIndexedContainer): """ Base class for all data frames (2-D matrix-like objects) used in the project. All the columns within the dataframe contain values for the same date range and have the same frequencies. All the columns are of the same types (e.g. log-returns/prices). """ @property def _constructor_sliced(self): from qf_lib.containers.series.qf_series import QFSeries return QFSeries @property def _constructor(self): return QFDataFrame @property def num_of_columns(self): return len(self.columns) @property def num_of_rows(self): return len(self.index)
[docs] def to_log_returns(self) -> "LogReturnsDataFrame": """ Converts dataframe to the dataframe of logarithmic returns. First date of prices in the returns dataframe won't be present. Returns ------- LogReturnsDataFrame dataframe of log returns """ from qf_lib.containers.dataframe.log_returns_dataframe import LogReturnsDataFrame series_type = self._constructor_sliced dataframe = self.apply(series_type.to_log_returns, axis=0) dataframe = cast_dataframe(dataframe, LogReturnsDataFrame) return dataframe
[docs] def to_simple_returns(self) -> "SimpleReturnsDataFrame": """ Converts dataframe to the dataframe of simple returns. First date of prices in the returns timeseries won't be present. Returns ------- SimpleReturnsDataFrame dataframe of simple returns """ from qf_lib.containers.dataframe.simple_returns_dataframe import SimpleReturnsDataFrame series_type = self._constructor_sliced dataframe = self.apply(series_type.to_simple_returns, axis=0) dataframe = cast_dataframe(dataframe, SimpleReturnsDataFrame) return dataframe
[docs] def to_prices(self, initial_prices: Sequence[float] = None, suggested_initial_date: Union[datetime, int, float] = None, frequency: Frequency = None) -> "PricesDataFrame": """ Converts a dataframe to the dataframe of prices. The dataframe of prices returned will have an extra date at the beginning (in comparison to the returns' dataframe). The difference between the extra date and the rest of the dates can be inferred from the returns' dataframe or can be calculated using the frequency passed as the optional argument. Additional date at the beginning (so called "initial date") is caused by the fact, that return for the first date of prices timeseries cannot be calculated, so it's missing. Thus, during the opposite conversion, extra date at the beginning will be added. Parameters ---------- initial_prices initial price for all timeseries. If no prices are specified, then they will be assumed to be 1. If only one value is passed (instead of a list with values for each column), then the initial price will be the same for each series contained within the dataframe. suggested_initial_date the first date or initial value for the prices series. It won't be necessarily the first date of the price series (e.g. if the method is run on the PricesDataFrame then it won't be used). frequency the frequency of the returns' timeseries. It is used to infer the initial date for the prices series. Returns ------- PricesDataFrame dataframe of prices """ initial_prices = self._prepare_value_per_column_list(initial_prices) initial_prices_iter = self._get_iterator_for_pandas(initial_prices) def to_prices_func(series, init_prices_iter=initial_prices_iter, suggested_init_date=suggested_initial_date, freq=frequency): initial_price = next(init_prices_iter) prices_series = series.to_prices(initial_price=initial_price, suggested_initial_date=suggested_init_date, frequency=freq) return prices_series dataframe = self.apply(to_prices_func, axis=0) from qf_lib.containers.dataframe.prices_dataframe import PricesDataFrame dataframe = cast_dataframe(dataframe, PricesDataFrame) return dataframe
[docs] def min_max_normalized(self, original_min_values: Sequence[float] = None, original_max_values: Sequence[float] = None) -> "QFDataFrame": """ Normalizes the data using min-max scaling: it maps all the data to the [0;1] range, so that 0 corresponds to the minimal value in the original series and 1 corresponds to the maximal value. It is also possible to specify values which should correspond to 0 and 1 after applying the normalization. It is useful if the same normalization parameters are used to normalize different data. Parameters ---------- original_min_values values which should correspond to 0 after applying the normalization (one value for each column) original_max_values values which should correspond to 1 after applying the normalization (one value for each column) Returns ------- QFDataFrame dataframe of normalized values """ # assert that user specified either both min and max values or none of them min_values = self._prepare_value_per_column_list(original_min_values) max_values = self._prepare_value_per_column_list(original_max_values) min_values_iter = self._get_iterator_for_pandas(min_values) max_values_iter = self._get_iterator_for_pandas(max_values) def min_max_norm_func(column, min_val_iter=min_values_iter, max_val_iter=max_values_iter): norm_column = column.min_max_normalized(next(min_val_iter), next(max_val_iter)) return norm_column norm_dataframe = self.apply(min_max_norm_func, axis=0) norm_dataframe = cast_dataframe(norm_dataframe, self._constructor) return norm_dataframe
[docs] def exponential_average(self, lambda_coeff: float = 0.94) -> "QFDataFrame": """ Calculates the exponential average of a dataframe. Parameters ---------- lambda_coeff lambda coefficient Returns ------- QFDataFrame smoothed version (exponential average) of the data frame """ lambda_coefficients = self._prepare_value_per_column_list(lambda_coeff) lambda_coefficients_iter = self._get_iterator_for_pandas(lambda_coefficients) def exponential_avg_func(column, lambda_coeff_iter=lambda_coefficients_iter): lambda_coefficient = next(lambda_coeff_iter) smoothed_column = column.exponential_average(lambda_coefficient) return smoothed_column smoothed_df = self.apply(exponential_avg_func, axis=0) smoothed_df = cast_dataframe(smoothed_df, self._constructor) return smoothed_df
[docs] def total_cumulative_return(self) -> "QFSeries": """ Calculates total cumulative return for each column. Returns ------- QFSeries Series containing total cumulative return for each column of the original DataFrame. """ series_type = self._constructor_sliced series = self.apply(series_type.total_cumulative_return, axis=0) series = cast_series(series, series_type) return series
def _prepare_value_per_column_list(self, values): if isinstance(values, Sized): self._assert_is_valid_values_list(values) result_values = values else: result_values = [values] * self.num_of_columns return result_values def _get_iterator_for_pandas(self, result_values): """ Creates iterator suitable to be used with pandas.apply function. As since pandas 1.1.0 apply and applymap on DataFrame evaluates first row/column only once there is no need to iterate over the first element twice in the generator. """ if isinstance(result_values, np.ndarray): result_values = result_values.tolist() return iter(result_values) def _assert_is_valid_values_list(self, values): num_of_values = len(values) if num_of_values != self.num_of_columns: error_msg = "Number of elements in the list must be equal to number of columns " \ "(is: {0}, should be: {1}".format(num_of_values, self.num_of_columns) raise ValueError(error_msg)
[docs] def rolling_window(self, window_size: int, func: Callable[[Union["QFSeries", np.ndarray]], float], step: int = 1, optimised: bool = False) -> "QFDataFrame": """ Looks at a number of windows of size ``window_size`` and transforms the data in those windows based on the specified ``func``. This is performed for each column inside this data frame. The window indices are stepped at a rate specified by ``step``. **Warning**: The ``other`` parameter is only present to keep consistency with QFSeries' rolling_window function, it should always be ``None``. Parameters ---------- window_size The size of the window to look at specified as the number of data points. func The function to call during each iteration. When ``other`` is ``None`` this function should take one ``QFSeries`` and return a value (Usually a number such as a ``float``). Otherwise, this function should take two ``QFSeries`` arguments and return a value. step The amount of data points to step through after each iteration, i.e. how much to move the window by in each iteration. optimised Whether the more efficient pandas algorithm should be used for the rolling window application. Note: This has some limitations: The ``step`` must be 1 and ``func`` will get an ``ndarray`` parameter which only contains values and no index. Returns ------- QFDataFrame data frame containing the transformed data """ if optimised: assert step == 1, "Optimised rolling is only possible with a step of 1." return self.rolling(window=window_size, center=False).apply(func=func) result = QFDataFrame() for col in self: transformed_data = self[col].rolling_window(window_size, func, step=step) result[col] = transformed_data return result
[docs] def rolling_time_window( self, window_length: int, step: int, func: Callable[[Union["QFDataFrame", np.ndarray]], "QFSeries"]) \ -> Union[None, "QFSeries", "QFDataFrame"]: """ Runs a given function on each rolling window in the dataframe. The content of a rolling window is also a QFDataFrame thus the funciton which should be applied should accept a QFDataFrame as an argument. The function may return either a QFSeries (then the output of rolling_time_window will be QFDataFrame) or a scalar value (then the output of rolling_time_window will be QFSeries). The rolling window is moved along the time index (rows). Parameters ---------- window_length number of rows which should be taken into rolling window step number of rows by which rolling window should be moved func function to apply on each rolling window. If it returns a QFSeries then the output of rolling_time_window() will be a QFDataFrame; if it returns a scalar value, the return value of rolling_time_window() will be a QFSeries Returns ------- None, QFSeries, QFDataFrame None (if the result of running the rolling window was empty) or QFSeries (if the function applied returned scalar value for each window) or QFDataFrame (if the function applied returned QFSeries for each window) """ results_dict = dict() # type: Dict[datetime, pd.Series] end_idx = self.num_of_rows while True: start_idx = end_idx - window_length if start_idx < 0: break patch = self.iloc[start_idx:end_idx, :] end_date = self.index[end_idx - 1] results_dict[end_date] = func(patch) end_idx -= step if not results_dict: return None first_element = next(iter(results_dict.values())) # type: "QFSeries" if isinstance(first_element, pd.Series): result = QFDataFrame.from_dict(results_dict, orient='index') result = cast_dataframe(result, QFDataFrame) else: from qf_lib.containers.series.qf_series import QFSeries dates_and_values = [(date, value) for date, value in results_dict.items()] dates, values = zip(*dates_and_values) result = QFSeries(index=dates, data=values) result = result.sort_index() return result
[docs] def get_frequency(self) -> Mapping[str, Frequency]: """ Attempts to infer the frequency of each column in this dataframe. The analysis uses pandas' infer_freq, as well as a heuristic to reduce the amount of ``Irregular`` results. See the implementation of the Frequency.infer_freq function for more information. """ result = {} for col in self: series = self[col] if not series.isnull().all(): # Drop NaN rows only when the series has at least one non-NaN value. # This is necessary because the series has been packed with other series which might have a higher # frequency. series = series.dropna(axis=0) result[col] = Frequency.infer_freq(series.index) return result