Source code for qf_lib.common.utils.factorization.factors_identification.elastic_net_factors_identifier

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

import numpy as np
import pandas as pd
from sklearn.linear_model import ElasticNet
from sklearn.linear_model import ElasticNetCV

from qf_lib.common.utils.factorization.factors_identification.factors_identifier import FactorsIdentifier
from qf_lib.common.utils.logging.qf_parent_logger import qf_logger
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.series.qf_series import QFSeries
from qf_lib.plotting.charts.line_chart import LineChart
from qf_lib.plotting.decorators.data_element_decorator import DataElementDecorator
from qf_lib.plotting.decorators.line_decorators import VerticalLineDecorator
from qf_lib.plotting.decorators.title_decorator import TitleDecorator


[docs]class ElasticNetFactorsIdentifier(FactorsIdentifier): """ Class used for identifying factors in the model with Elastic Net method (with Cross-validation). Parameters ---------- max_number_of_regressors maximal number of regressors that can be included in the model epsilon if abs(coefficient) is smaller than epsilon it is considered to be zero, thus won't be included in the model l1_ratio value between [0,1] the higher the simpler and more sensitive model is to collinear factors number_of_alphas number of different lambda values tested is_intercept True if intercept should be included in the model, False otherwise graphic_debug default False; If True, then some additional debug info will be plotted (used when tuning the ElasticNetFactorsIdentifier's parameters) """ NUMBER_OF_FOLDS = 10 """ number of folds in the k-fold cross-validation. """ MIN_NUM_OF_1SE_REGRESSORS = 2 """ minimal number of regressors taken for the alpha_1se (max. alpha for which the MSE is within 1 std. from the min. MSE). If number of regressors is smaller, then coefficients for min. MSE are taken. """ def __init__(self, max_number_of_regressors: int = 10, epsilon: float = 0.05, l1_ratio: float = 1, number_of_alphas: int = 75, is_intercept: bool = True, graphic_debug: bool = False): self.logger = qf_logger.getChild(self.__class__.__name__) self.max_number_of_regressors = max_number_of_regressors self.epsilon = epsilon self.l1_ratio = l1_ratio self.number_of_alphas = number_of_alphas self.is_intercept = is_intercept self.graphic_debug = graphic_debug self.coeffs_chart = None self.mse_chart = None
[docs] def select_best_factors(self, regressors_df: QFDataFrame, analysed_tms: QFSeries) -> QFDataFrame: """ Returns the dataframe which is the subset of the original regressors_df but only contains rows for dates common for it and analysed_tms and only contains columns for coefficients which should be included in the model. Factors are identified using Elastic Net method with Cross-Validation (for calculating the MSE). Parameters ---------- regressors_df dataframe containing data for regressors (e.g. daily log-returns) analysed_tms timeseries of analysed data (data which should be modeled with regressors, e.g. daily log-returns) Returns ------- QFDataFrame Subset of the original regressors_df. Only contains rows corresponding to dates common for it and analysed_tms. Only contains columns corresponding to coefficients which should be included in the model """ self.logger.debug("Model selection using Elastic Net in progress...") alphas, index_1se, enet_optimizer = self._optimize_using_enet(analysed_tms, regressors_df) # vector of vectors of coefficients coeffs_path = self._get_coeffs_path(alphas, analysed_tms, regressors_df) # by default take the solution for 1SE from the best fit coefficients_vector = coeffs_path[index_1se] mean_square_errors = np.mean(enet_optimizer.mse_path_, axis=1) solutions_idx = index_1se index_min_se = np.argmin(mean_square_errors) # type: int num_of_nonzero_coeffs = self._number_of_non_zero_coefficients(coefficients_vector) if num_of_nonzero_coeffs <= self.MIN_NUM_OF_1SE_REGRESSORS: solutions_idx = index_min_se coefficients_vector = coeffs_path[solutions_idx] if self._number_of_non_zero_coefficients(coefficients_vector) <= self.max_number_of_regressors: self.logger.info("Solution within one std from minimum MSE found") else: self.logger.warning('No solution within one std from minimum MSE') coefficients_vector, solutions_idx = self._simplify_model(coeffs_path, index_1se) included_coefficients_idx = self._indices_of_non_zero_coefficients(coefficients_vector) selected_regressors_df = regressors_df.iloc[:, np.sort(included_coefficients_idx)] if self.graphic_debug: self.coeffs_chart, self.mse_chart = self._plot_graphic_debug_info( alphas, coeffs_path, mean_square_errors, solutions_idx, index_min_se) return selected_regressors_df
def _simplify_model(self, coeffs_path, index_1se): # find fit with fewer regressors than MAX_NUMBER_OF_REGRESSORS solutions_idx = index_1se - 1 coefficients_vector = coeffs_path[solutions_idx] while self._number_of_non_zero_coefficients(coefficients_vector) > self.max_number_of_regressors \ and solutions_idx != 0: solutions_idx -= 1 coefficients_vector = coeffs_path[solutions_idx] return coefficients_vector, solutions_idx def _optimize_using_enet(self, analysed_tms, regressors_df): enet_optimizer = ElasticNetCV(l1_ratio=self.l1_ratio, n_alphas=self.number_of_alphas, cv=self.NUMBER_OF_FOLDS, fit_intercept=self.is_intercept) enet_optimizer.fit(regressors_df, analysed_tms) self.logger.debug('Finished Elastic Net analysis') alphas = enet_optimizer.alphas_ mean_square_errors = np.mean(enet_optimizer.mse_path_, axis=1) index_1se = self._get_arg_alpha_1se(alphas, mean_square_errors) return alphas, index_1se, enet_optimizer def _plot_graphic_debug_info(self, alphas, coeffs_path, mean_square_errors, solutions_idx, index_min_se): min_se_solution = alphas[index_min_se] # alpha for a solution with minimal mean error chosen_solution = alphas[solutions_idx] # alpha for chosen solution # Elastic Net (using Cross Validation) coeffs_chart = self._create_coeffs_debug_chart(alphas, chosen_solution, coeffs_path, min_se_solution) # Cross-validated avg. MSE of Elastic Net fit mse_chart = self._create_mse_debug_chart(alphas, chosen_solution, mean_square_errors, min_se_solution) return coeffs_chart, mse_chart def _create_coeffs_debug_chart(self, alphas, chosen_solution, coeffs_path, min_se_solution): coeffs_chart = LineChart() coefficients_paths = QFDataFrame(data=coeffs_path, index=pd.Index(alphas)) for _, path in coefficients_paths.items(): coeffs_chart.add_decorator(DataElementDecorator(path)) coeffs_chart.add_decorator(TitleDecorator("Elastic Net (using Cross Validation)")) coeffs_chart.add_decorator(VerticalLineDecorator(x=min_se_solution, linestyle='-.')) coeffs_chart.add_decorator(VerticalLineDecorator(x=chosen_solution, linestyle='-')) coeffs_chart.plot() coeffs_chart.axes.invert_xaxis() return coeffs_chart def _create_mse_debug_chart(self, alphas, chosen_solution, mean_square_errors, min_se_solution): mse_chart = LineChart() mean_square_errors_paths = QFDataFrame(data=mean_square_errors, index=pd.Index(alphas)) mse_chart.add_decorator(TitleDecorator("Cross-validated avg. MSE of Elastic Net fit")) for _, path in mean_square_errors_paths.items(): mse_chart.add_decorator(DataElementDecorator(path)) mse_chart.add_decorator(VerticalLineDecorator(x=min_se_solution, linestyle='-.')) mse_chart.add_decorator(VerticalLineDecorator(x=chosen_solution, linestyle='-')) mse_chart.plot() mse_chart.axes.invert_xaxis() return mse_chart def _get_coeffs_path(self, alphas, common_analysed_tms, common_regressors_df): coeffs_path = [] for alpha in alphas: model = ElasticNet(alpha=alpha, l1_ratio=self.l1_ratio) model.fit(common_regressors_df, common_analysed_tms) coeffs_path.append(model.coef_) return coeffs_path def _get_arg_alpha_1se(self, alphas, mean_square_errors): mse_std = np.std(mean_square_errors) min_mse = np.min(mean_square_errors) max_mse_limit = min_mse + mse_std # find largest alpha lower then the given limit alphas_within_the_limit = alphas[mean_square_errors < max_mse_limit] alpha_1se = np.max(alphas_within_the_limit) # np.argmax finds the index of first True occurrence in bool array (in this case) index_of_alpha_1se = np.argmax(alphas == alpha_1se) # type: int return index_of_alpha_1se def _number_of_non_zero_coefficients(self, coefficients_vector): coefficients_greater_than_epsilon = np.abs(coefficients_vector) > self.epsilon # type: np.ndarray return np.sum(coefficients_greater_than_epsilon) def _indices_of_non_zero_coefficients(self, coefficients_vector): coefficients_greater_than_epsilon = np.abs(coefficients_vector) > self.epsilon # type: np.ndarray return np.flatnonzero(coefficients_greater_than_epsilon) # gives the indices of all True values