Source code for qf_lib.data_providers.bloomberg.bloomberg_data_provider

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.
import warnings

import pandas as pd
from datetime import datetime
from typing import Union, Sequence, Dict, List, Optional

from qf_lib.common.enums.expiration_date_field import ExpirationDateField
from qf_lib.common.enums.frequency import Frequency
from qf_lib.common.enums.price_field import PriceField
from qf_lib.common.enums.security_type import SecurityType
from qf_lib.common.tickers.tickers import BloombergTicker, Ticker
from qf_lib.common.utils.dateutils.relative_delta import RelativeDelta
from qf_lib.common.utils.logging.qf_parent_logger import qf_logger
from qf_lib.common.utils.miscellaneous.to_list_conversion import convert_to_list
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.futures.future_tickers.bloomberg_future_ticker import BloombergFutureTicker
from qf_lib.containers.qf_data_array import QFDataArray
from qf_lib.containers.series.qf_series import QFSeries
from qf_lib.data_providers.abstract_price_data_provider import AbstractPriceDataProvider
from qf_lib.data_providers.helpers import normalize_data_array, cast_dataframe_to_proper_type
from qf_lib.data_providers.tickers_universe_provider import TickersUniverseProvider
from qf_lib.settings import Settings

try:
    import blpapi

    from qf_lib.data_providers.bloomberg.futures_data_provider import FuturesDataProvider
    from qf_lib.data_providers.bloomberg.historical_data_provider import HistoricalDataProvider
    from qf_lib.data_providers.bloomberg.reference_data_provider import ReferenceDataProvider
    from qf_lib.data_providers.bloomberg.tabular_data_provider import TabularDataProvider
    from qf_lib.data_providers.bloomberg.exceptions import BloombergError
    from qf_lib.data_providers.bloomberg.bloomberg_names import REF_DATA_SERVICE_URI
    from qf_lib.data_providers.bloomberg.helpers import convert_to_bloomberg_date

    is_blpapi_installed = True
except ImportError:
    is_blpapi_installed = False
    warnings.warn("No Bloomberg API installed. If you would like to use BloombergDataProvider first install the blpapi"
                  " library")


[docs]class BloombergDataProvider(AbstractPriceDataProvider, TickersUniverseProvider): """ Data Provider which provides financial data from Bloomberg. """ def __init__(self, settings: Settings): super().__init__() self.settings = settings self.host = settings.bloomberg.host self.port = settings.bloomberg.port self.logger = qf_logger.getChild(self.__class__.__name__) if is_blpapi_installed: session_options = blpapi.SessionOptions() session_options.setServerHost(self.host) session_options.setServerPort(self.port) session_options.setAutoRestartOnDisconnection(True) self.session = blpapi.Session(session_options) self._historical_data_provider = HistoricalDataProvider(self.session) self._reference_data_provider = ReferenceDataProvider(self.session) self._tabular_data_provider = TabularDataProvider(self.session) self._futures_data_provider = FuturesDataProvider(self.session) else: self.session = None self._historical_data_provider = None self._reference_data_provider = None self._tabular_data_provider = None self._futures_data_provider = None self.logger.warning("Couldn't import the Bloomberg API. Check if the necessary dependencies are installed.") self.connected = False
[docs] def connect(self): """ Connects to Bloomberg data service and holds a connection. Connecting might take about 10-15 seconds """ self.connected = False if not is_blpapi_installed: self.logger.error("Couldn't import the Bloomberg API. Check if the necessary dependencies are installed.") return if not self.session.start(): self.logger.error("Failed to start session with host: " + str(self.host) + " on port: " + str(self.port)) return if not self.session.openService(REF_DATA_SERVICE_URI): self.logger.error("Failed to open service: " + REF_DATA_SERVICE_URI) return self.connected = True
def _get_futures_chain_dict(self, tickers: Union[BloombergFutureTicker, Sequence[BloombergFutureTicker]], expiration_date_fields: Union[str, Sequence[str]]) -> Dict[BloombergFutureTicker, QFDataFrame]: """ Returns tickers of futures contracts, which belong to the same futures contract chain as the provided ticker (tickers), along with their expiration dates. Parameters ---------- tickers: BloombergFutureTicker, Sequence[BloombergFutureTicker] future tickers for which future chains should be retrieved expiration_date_fields: ExpirationDateField, Sequence[ExpirationDateField] field that should be downloaded as the expiration date field Returns ------- Dict[BloombergFutureTicker, QFDataFrame] Dictionary mapping each BloombergFutureTicker to a QFDataFrame, containing specific future contracts tickers (BloombergTickers), indexed by these tickers Raises ------- BloombergError When couldn't get the data from Bloomberg Service """ self._connect_if_needed() self._assert_is_connected() tickers, got_single_ticker = convert_to_list(tickers, BloombergFutureTicker) expiration_date_fields, _ = convert_to_list(expiration_date_fields, str) # Create a dictionary, which is mapping BloombergFutureTickers to lists of tickers related to specific future # contracts belonging to the chain, e.g. it will map Cotton Bloomberg future ticker into: # [BloombergTicker("CTH7 Comdty"), BloombergTicker("CTK7 Comdty"), BloombergTicker("CTN7 Comdty"), # BloombergTicker("CTV7 Comdty"), BloombergTicker("CTZ7 Comdty") ... ] future_ticker_to_chain_tickers_list: Dict[BloombergFutureTicker, List[BloombergTicker]] = \ self._futures_data_provider.get_list_of_tickers_in_the_future_chain(tickers) all_specific_tickers = [ticker for specific_tickers_list in future_ticker_to_chain_tickers_list.values() for ticker in specific_tickers_list] futures_expiration_dates = self.get_current_values(all_specific_tickers, expiration_date_fields).dropna(how="all") def specific_futures_index(future_ticker) -> pd.Index: """ Returns an Index of specific tickers for the given future ticker, which appeared in the futures expiration dates dataframe / series. """ specific_tickers_list = future_ticker_to_chain_tickers_list[future_ticker] return futures_expiration_dates.index.intersection(specific_tickers_list) ticker_to_future_expiration_dates = { future_ticker: futures_expiration_dates.loc[specific_futures_index(future_ticker)] for future_ticker in tickers } return ticker_to_future_expiration_dates
[docs] def get_current_values(self, tickers: Union[BloombergTicker, Sequence[BloombergTicker]], fields: Union[str, Sequence[str]], override_name: str = None, override_value: str = None ) -> Union[None, float, str, QFSeries, QFDataFrame]: """ Gets the current values of fields for given tickers. Parameters ---------- tickers: BloombergTicker, Sequence[BloombergTicker] tickers for securities which should be retrieved fields: str, Sequence[str] fields of securities which should be retrieved Returns ------- QFDataFrame/QFSeries Either QFDataFrame with 2 dimensions: ticker, field or QFSeries with 1 dimensions: ticker of field (depending if many tickers or fields was provided) is returned. Raises ------- BloombergError When couldn't get the data from Bloomberg Service """ self._connect_if_needed() self._assert_is_connected() tickers, got_single_ticker = convert_to_list(tickers, BloombergTicker) fields, got_single_field = convert_to_list(fields, (PriceField, str)) data_frame = self._reference_data_provider.get(tickers, fields, override_name, override_value) # to keep the order of tickers and fields we reindex the data frame data_frame = data_frame.reindex(index=tickers, columns=fields) # squeeze unused dimensions tickers_indices = 0 if got_single_ticker else slice(None) fields_indices = 0 if got_single_field else slice(None) squeezed_result = data_frame.iloc[tickers_indices, fields_indices] casted_result = cast_dataframe_to_proper_type(squeezed_result) if tickers_indices != 0 or fields_indices != 0 \ else squeezed_result return casted_result
[docs] def get_history(self, tickers: Union[BloombergTicker, Sequence[BloombergTicker]], fields: Union[str, Sequence[str]], start_date: datetime, end_date: datetime = None, frequency: Frequency = Frequency.DAILY, currency: str = None, override_name: str = None, override_value: str = None) \ -> Union[QFSeries, QFDataFrame, QFDataArray]: """ Gets historical data from Bloomberg from the (start_date - end_date) time range. In case of frequency, which is higher than daily frequency (intraday data), the data is indexed by the start_date. E.g. Time range: 8:00 - 8:01, frequency: 1 minute - indexed with the 8:00 timestamp Parameters ---------- tickers: Ticker, Sequence[Ticker] tickers for securities which should be retrieved fields: None, str, Sequence[str] fields of securities which should be retrieved. If None, all available fields will be returned (only supported by few DataProviders) start_date: datetime date representing the beginning of historical period from which data should be retrieved end_date: datetime date representing the end of historical period from which data should be retrieved; if no end_date was provided, by default the current date will be used frequency: Frequency frequency of the data currency: str override_name: str override_value: str Returns ------- QFSeries, QFDataFrame, QFDataArray If possible the result will be squeezed, so that instead of returning QFDataArray, data of lower dimensionality will be returned. The results will be either an QFDataArray (with 3 dimensions: date, ticker, field), a QFDataFrame (with 2 dimensions: date, ticker or field; it is also possible to get 2 dimensions ticker and field if single date was provided) or QFSeries (with 1 dimensions: date). If no data is available in the database or a non existing ticker was provided an empty structure (QFSeries, QFDataFrame or QFDataArray) will be returned returned. """ if fields is None: raise ValueError("Fields being None is not supported by {}".format(self.__class__.__name__)) self._connect_if_needed() self._assert_is_connected() end_date = end_date or datetime.now() end_date = end_date + RelativeDelta(second=0, microsecond=0) start_date = self._adjust_start_date(start_date, frequency) got_single_date = self._got_single_date(start_date, end_date, frequency) tickers, got_single_ticker = convert_to_list(tickers, BloombergTicker) fields, got_single_field = convert_to_list(fields, (PriceField, str)) def current_ticker(t: BloombergTicker): return t.get_current_specific_ticker() if isinstance(t, BloombergFutureTicker) else t tickers_mapping = {current_ticker(t): t for t in tickers} data_array = self._historical_data_provider.get( tickers, fields, start_date, end_date, frequency, currency, override_name, override_value) data_array = data_array.assign_coords(tickers=[tickers_mapping.get(t, t) for t in data_array.tickers.values]) normalized_result = normalize_data_array( data_array, tickers, fields, got_single_date, got_single_ticker, got_single_field) return normalized_result
[docs] def supported_ticker_types(self): return {BloombergTicker, BloombergFutureTicker}
[docs] def expiration_date_field_str_map(self, ticker: BloombergTicker = None) -> Dict[ExpirationDateField, str]: expiration_date_field_dict = { ExpirationDateField.FirstNotice: "FUT_NOTICE_FIRST", ExpirationDateField.LastTradeableDate: "LAST_TRADEABLE_DT" } return expiration_date_field_dict
[docs] def price_field_to_str_map(self) -> Dict[PriceField, str]: price_field_dict = { PriceField.Open: 'PX_OPEN', PriceField.High: 'PX_HIGH', PriceField.Low: 'PX_LOW', PriceField.Close: 'PX_LAST', PriceField.Volume: 'PX_VOLUME' } return price_field_dict
[docs] def get_tickers_universe(self, universe_ticker: BloombergTicker, date: Optional[datetime] = None) -> List[BloombergTicker]: date = date or datetime.now() field = 'INDX_MWEIGHT_HIST' ticker_data = self.get_tabular_data(universe_ticker, field, override_names="END_DT", override_values=convert_to_bloomberg_date(date)) return [BloombergTicker(fields['Index Member'] + " Equity", SecurityType.STOCK, 1) for fields in ticker_data]
[docs] def get_unique_tickers(self, universe_ticker: Ticker) -> List[Ticker]: raise ValueError("BloombergDataProvider does not provide historical tickers_universe data")
[docs] def get_tabular_data(self, ticker: BloombergTicker, field: str, override_names: Optional[Union[str, Sequence[str]]] = None, override_values: Optional[Union[str, Sequence[str]]] = None) -> List: """ Provides current tabular data from Bloomberg. Was tested on 'INDX_MEMBERS' and 'MERGERS_AND_ACQUISITIONS' requests. There is no guarantee that all other request will be handled, as returned data structures might vary. Parameters ----------- ticker: BloombergTicker ticker for security that should be retrieved field: str field of security that should be retrieved override_names: str override_values: str Returns ------- List tabular data for the given ticker and field """ if field is None: raise ValueError("Field being None is not supported by {}".format(self.__class__.__name__)) self._connect_if_needed() self._assert_is_connected() if override_names is not None: override_names, _ = convert_to_list(override_names, str) if override_values is not None: override_values, _ = convert_to_list(override_values, str) tickers, got_single_ticker = convert_to_list(ticker, BloombergTicker) fields, got_single_field = convert_to_list(field, (PriceField, str)) tickers_str = [t.as_string() for t in tickers] result = self._tabular_data_provider.get(tickers_str, fields, override_names, override_values) return result
def _connect_if_needed(self): if not self.connected: self.connect() def _assert_is_connected(self): if not self.connected: raise BloombergError("Connection to Bloomberg was not successful.")