Source code for qf_lib.data_providers.bloomberg_dl.bloomberg_dl_data_provider

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

import hashlib
import json
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Union, Sequence, Dict, List, Optional, Tuple
from urllib.parse import urljoin

import pandas as pd
from pandas import DataFrame, read_json

from qf_lib.common.enums.expiration_date_field import ExpirationDateField
from qf_lib.common.enums.frequency import Frequency
from qf_lib.common.enums.price_field import PriceField
from qf_lib.common.enums.security_type import SecurityType
from qf_lib.common.tickers.tickers import BloombergTicker
from qf_lib.common.utils.dateutils.date_to_string import date_to_str
from qf_lib.common.utils.dateutils.relative_delta import RelativeDelta
from qf_lib.common.utils.dateutils.timer import Timer
from qf_lib.common.utils.miscellaneous.to_list_conversion import convert_to_list
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.futures.future_tickers.bloomberg_future_ticker import BloombergFutureTicker
from qf_lib.containers.futures.future_tickers.future_ticker import FutureTicker
from qf_lib.containers.qf_data_array import QFDataArray
from qf_lib.containers.series.qf_series import QFSeries
from qf_lib.data_providers.abstract_price_data_provider import AbstractPriceDataProvider
from qf_lib.data_providers.bloomberg.exceptions import BloombergError
from qf_lib.data_providers.bloomberg_dl.utils.bloomberg_dl_parser import BloombergDLParser
from qf_lib.data_providers.bloomberg_dl.utils.bloomberg_dl_session import BloombergDLSession
from qf_lib.data_providers.bloomberg_dl.utils.sse_client import SSEClient
from qf_lib.data_providers.futures_data_provider import FuturesDataProvider
from qf_lib.data_providers.helpers import normalize_data_array, cast_dataframe_to_proper_type
from qf_lib.data_providers.tickers_universe_provider import TickersUniverseProvider
from qf_lib.settings import Settings
from qf_lib.starting_dir import get_starting_dir_abs_path


[docs] class BloombergDLDataProvider(AbstractPriceDataProvider, TickersUniverseProvider, FuturesDataProvider): """ Data Provider which provides financial data from Bloomberg Data License REST API. The settings file requires the following variables: - bbg_dl.client_id - bbg_dl.client_secret Other optional settings parameters: - bbg_dl.user (parameter to link the Data License to a Bloomberg Anywhere or Bloomberg Professional account; the User value can be obtained by running IAM <GO> in the Bloomberg terminal) - bbg_dl.sn (parameter to link the Data License to a Bloomberg Professional account; the S/N value can be obtained by running IAM <GO> in the Bloomberg terminal) Parameters ---------- settings: Settings Settings object containing bbg_dl credentials and output_directory. reply_timeout: int Maximum time in minutes to wait for an SSE delivery notification. timer: Timer, optional Timer instance used for end-date calculations. save_to_disk: bool If True, raw JSON responses are persisted under the output_directory. push_notification: bool If True (default), SSE Event Stream for DL Platform Content Endpoint Notifications is used to notify about the delivered content. If False, the endpoints are polled regularly every 30 seconds. check_existing_first: bool If True, before POSTing a new request, first polls Bloomberg to see if data for this request already exists (using a deterministic request name from the payload hash). If found, fetches it via GET; otherwise creates a new request. """ HOST = "https://api.bloomberg.com" TIMEZONE = timezone.utc _IDENTIFIER_TYPE_MAP = { "ticker": "TICKER", "cusip": "CUSIP", "buid": "BB_UNIQUE", "bbgid": "BB_GLOBAL", "isin": "ISIN", "wpk": "WPK", "sedol1": "SEDOL", "common": "COMMON_NUMBER", "cins": "CINS", "cats": "CATS", } def __init__(self, settings: Settings, reply_timeout: int = 5, timer: Optional[Timer] = None, save_to_disk: bool = False, push_notification: bool = True, check_existing_first: bool = False): super().__init__(timer=timer) self.parser = BloombergDLParser() self.reply_timeout: RelativeDelta = RelativeDelta(minutes=reply_timeout) self.save_to_disk = save_to_disk self.downloads_path = self._prepare_downloads_path(settings) if save_to_disk else None self._terminal_identity_user = self._get_settings_attribute(settings.bbg_dl, "user") self._terminal_identity_sn = self._get_settings_attribute(settings.bbg_dl, "sn") self._push_notification = push_notification self._check_existing_first = check_existing_first try: self.session = BloombergDLSession( client_id=settings.bbg_dl.client_id, client_secret=settings.bbg_dl.client_secret, ) catalog_id = self._discover_catalog_id() self.account_url = urljoin(self.HOST, f'/eap/catalogs/{catalog_id}/') self.logger.debug(f"Bloomberg DL REST API connection established - catalog: {catalog_id}") except Exception as exc: raise BloombergError(f"Failed to connect to Bloomberg DL REST API: {exc}") from exc @staticmethod def _prepare_downloads_path(settings: Settings): """Create and return the directory used to persist raw JSON responses.""" try: output_folder = "dl_responses" downloads_path: Optional[Path] = ( Path(get_starting_dir_abs_path()) / settings.output_directory / output_folder ) downloads_path.mkdir(parents=True, exist_ok=True) return downloads_path except Exception as e: raise BloombergError(f"Error while creating downloads path: {e}")
[docs] def get_history(self, tickers: Union[BloombergTicker, Sequence[BloombergTicker]], fields: Union[str, Sequence[str]], start_date: datetime, end_date: Optional[datetime] = None, frequency: Frequency = Frequency.DAILY, currency: Optional[str] = None, pricing_source: Optional[str] = "BGN", look_ahead_bias: bool = False, overrides: Optional[Dict[str, str]] = None, **kwargs) -> Union[QFSeries, QFDataFrame, QFDataArray]: """ Gets historical data from Bloomberg DL REST API from the (start_date - end_date) time range. Parameters ---------- tickers: BloombergTicker or Sequence[BloombergTicker] Tickers for securities which should be retrieved. fields: str or Sequence[str] Fields of securities which should be retrieved. start_date: datetime Date representing the beginning of historical period from which data should be retrieved. end_date: datetime, optional Date representing the end of historical period from which data should be retrieved; if not provided, the current date will be used. frequency: Frequency Frequency of the data. currency: str, optional Currency which should be used to obtain the historical data (by default local currency is used). pricing_source: str, optional Pricing source applied to all financial instruments in the request universe. By default equals to 'BGN'. look_ahead_bias: bool If set to False, the look-ahead bias will be taken care of to make sure no future data is returned. overrides: dict, optional A dictionary of field overrides, e.g. {'INCLUDE_EXPIRED_CONTRACTS': 'Y'}. Returns ------- QFSeries or QFDataFrame or QFDataArray If possible the result will be squeezed so that instead of returning QFDataArray, data of lower dimensionality will be returned. Raises ------ BloombergError When unexpected response from Bloomberg DL REST API occurred. """ if frequency not in self.supported_frequencies(): raise NotImplementedError(f"The provided frequency: {frequency} is not supported by the " f"{self.__class__.__name__}. To review the list of supported frequencies, please " f"consult the output of supported_frequencies() function.") original_end_date = (end_date or self.timer.now()) + RelativeDelta(second=0, microsecond=0) end_date = original_end_date if look_ahead_bias else self.get_end_date_without_look_ahead(original_end_date, frequency) start_date = self._adjust_start_date(start_date, frequency) got_single_date = self._got_single_date(start_date, original_end_date, frequency) fields, got_single_field = convert_to_list(fields, str) field_overrides = self._build_field_overrides(overrides) tickers, got_single_ticker = convert_to_list(tickers, BloombergTicker) actual_tickers = [self._current_ticker(t) for t in tickers] id_value_to_ticker = self._build_id_value_mapping(tickers, actual_tickers) tickers_str = [t.as_string() for t in actual_tickers] # Prepare the request payload request_payload = self._generate_request_payload(tickers_str, fields, True, pricing_source, field_overrides) request_payload['runtimeOptions'] = { '@type': 'HistoryRuntimeOptions', 'period': str(frequency).lower(), 'dateRange': { '@type': 'IntervalDateRange', 'startDate': date_to_str(start_date), 'endDate': date_to_str(end_date), }, } if currency: request_payload['runtimeOptions']['historyPriceCurrency'] = currency data_array = self._submit_and_download(request_payload) if data_array is None: data_array = QFDataArray.create(dates=[], tickers=tickers, fields=fields) data_array = data_array.reindex(tickers=id_value_to_ticker.index).assign_coords( tickers=id_value_to_ticker.values).dropna(how="all", dim="tickers") normalized_result = normalize_data_array(data_array, tickers, fields, got_single_date, got_single_ticker, got_single_field) return normalized_result
[docs] def get_current_values(self, tickers: Union[BloombergTicker, Sequence[BloombergTicker]], fields: Union[str, Sequence[str]], overrides: Optional[Dict[str, str]] = None, pricing_source: Optional[str] = "BGN") -> \ Union[None, float, str, list, QFSeries, QFDataFrame]: """ Gets from the Bloomberg DL REST API the current values of fields for given tickers. Parameters ---------- tickers: BloombergTicker or Sequence[BloombergTicker] Tickers for securities which should be retrieved. fields: str or Sequence[str] Fields of securities which should be retrieved. overrides: dict, optional A dictionary where each key is a field mnemonic and the value is the override string. Example: {'INCLUDE_EXPIRED_CONTRACTS': 'Y'}. pricing_source: str, optional Pricing source applied to all financial instruments in the request universe. By default equals to 'BGN'. Returns ------- float or QFSeries or QFDataFrame Either QFDataFrame with 2 dimensions: ticker, field or QFSeries with 1 dimension: ticker or field (depending on whether many tickers or fields were provided). Raises ------ BloombergError When unexpected response from Bloomberg DL REST API occurred. """ fields, got_single_field = convert_to_list(fields, str) field_overrides = self._build_field_overrides(overrides) tickers, got_single_ticker = convert_to_list(tickers, BloombergTicker) actual_tickers = [self._current_ticker(t) for t in tickers] tickers_str = [t.as_string() for t in actual_tickers] id_value_to_ticker = self._build_id_value_mapping(tickers, actual_tickers) request_payload = self._generate_request_payload(tickers_str, fields, False, pricing_source, field_overrides) data_frame = self._submit_and_download(request_payload) if data_frame is None: data_frame = QFDataFrame(columns=fields) data_frame.index = data_frame.index.map(id_value_to_ticker) data_frame = data_frame.reindex(index=tickers, columns=fields) # squeeze unused dimensions tickers_indices = 0 if got_single_ticker else slice(None) fields_indices = 0 if got_single_field else slice(None) squeezed_result = data_frame.iloc[tickers_indices, fields_indices] return cast_dataframe_to_proper_type(squeezed_result) if tickers_indices != 0 or fields_indices != 0 \ else squeezed_result
[docs] def get_tickers_universe(self, universe_ticker: BloombergTicker, date: Optional[datetime] = None, display_figi: bool = False) -> List[BloombergTicker]: """ Returns a list of all members of an index. Bloomberg Data License supports only fetching constituents for the current date and it will not return any data for indices with more than 20,000 members. Parameters ---------- universe_ticker: BloombergTicker Ticker that describes a specific universe, which members will be returned. date: datetime, optional Date for which current universe members' tickers will be returned. display_figi: bool If True, return Financial Instrument Global Identifiers (FIGI) instead of standard tickers. """ members_and_weights = self._get_index_members_and_weights(universe_ticker, date, display_figi) return members_and_weights.index.tolist()
[docs] def get_tickers_universe_with_weights(self, universe_ticker: BloombergTicker, date: Optional[datetime] = None, display_figi: bool = False) -> QFSeries: """ Returns a series of all members of an index with their weights. It will not return any data for indices with more than 20,000 members. Parameters ---------- universe_ticker: BloombergTicker Ticker that describes a specific universe, which members will be returned. date: datetime, optional Date for which current universe members' tickers will be returned. display_figi: bool If True, return Financial Instrument Global Identifiers (FIGI) instead of standard tickers. Returns ------- QFSeries A series of the weights of all BloombergTickers within the requested index, indexed by those tickers. """ return self._get_index_members_and_weights(universe_ticker, date, display_figi)
[docs] def get_unique_tickers(self, universe_ticker: BloombergTicker) -> List[BloombergTicker]: raise NotImplementedError(f"{self.__class__.__name__} does not provide historical tickers_universe data")
[docs] def price_field_to_str_map(self) -> Dict[PriceField, str]: return { PriceField.Open: 'PX_OPEN', PriceField.High: 'PX_HIGH', PriceField.Low: 'PX_LOW', PriceField.Close: 'PX_LAST', PriceField.Volume: 'PX_VOLUME' }
[docs] def expiration_date_field_str_map(self, ticker: BloombergTicker = None) -> Dict[ExpirationDateField, str]: return { ExpirationDateField.FirstNotice: "FUT_NOTICE_FIRST", ExpirationDateField.LastTradeableDate: "LAST_TRADEABLE_DT" }
[docs] def supported_ticker_types(self): return {BloombergTicker, BloombergFutureTicker}
@staticmethod def supported_frequencies(): return {Frequency.DAILY, Frequency.WEEKLY, Frequency.MONTHLY, Frequency.QUARTERLY, Frequency.YEARLY} @staticmethod def _current_ticker(t: BloombergTicker): """Resolve a FutureTicker to its current specific ticker, or return the ticker as-is.""" return t.get_current_specific_ticker() if isinstance(t, BloombergFutureTicker) else t def _generate_request_payload(self, tickers_str: Sequence[str], fields: Sequence[str], historical: bool, pricing_source: Optional[str], field_overrides): """Build the JSON payload for a HistoryRequest or DataRequest submission.""" tickers_str = sorted(tickers_str) fields = sorted(fields) contains = [self._build_identifier_dict(t, field_overrides) for t in tickers_str] contains = [c for c in contains if c is not None] if len(contains) == 0: raise ValueError("No valid tickers were provided. Please refer to the logs and adjust your data request.") request_payload = { '@type': 'HistoryRequest' if historical else 'DataRequest', 'universe': { '@type': 'Universe', 'contains': contains, }, 'fieldList': { '@type': 'HistoryFieldList' if historical else 'DataFieldList', 'contains': [{'mnemonic': f} for f in fields], }, 'trigger': {'@type': 'SubmitTrigger'}, 'formatting': { '@type': 'MediaType', 'outputMediaType': "application/json" } } if pricing_source: request_payload['pricingSourceOptions'] = { '@type': 'HistoryPricingSourceOptions' if historical else 'DataPricingSourceOptions', 'prefer': {'mnemonic': pricing_source}, } self._set_terminal_identity(request_payload) return request_payload def _get_futures_chain_dict(self, tickers: Union[BloombergFutureTicker, Sequence[BloombergFutureTicker]], expiration_date_fields: Union[str, Sequence[str]], universe_creation_time: datetime = None) -> Dict[FutureTicker, QFDataFrame]: """ Returns tickers of futures contracts, which belong to the same futures contract chain as the provided ticker, along with their expiration dates. Parameters ---------- tickers: BloombergFutureTicker or Sequence[BloombergFutureTicker] Future tickers for which future chains should be retrieved. expiration_date_fields: str or Sequence[str] Field(s) that should be downloaded as the expiration date field. universe_creation_time: datetime, optional Unused; kept for interface compatibility. Returns ------- Dict[BloombergFutureTicker, QFDataFrame] Dictionary mapping each BloombergFutureTicker to a QFDataFrame containing specific future contract tickers (BloombergTickers), indexed by those tickers. """ tickers, _ = convert_to_list(tickers, BloombergFutureTicker) expiration_date_fields, _ = convert_to_list(expiration_date_fields, str) active_ticker_string_to_future_ticker = { future_ticker.get_active_ticker(): future_ticker for future_ticker in tickers } active_bbg_tickers = list(active_ticker_string_to_future_ticker.keys()) field_overrides = {"CHAIN_DATE": datetime.now().strftime('%Y%m%d'), 'INCLUDE_EXPIRED_CONTRACTS': 'Y'} active_ticker_to_chain_tickers_list = self.get_current_values(active_bbg_tickers, "FUT_CHAIN", overrides=field_overrides).to_dict() future_ticker_to_chain_tickers_list = { active_ticker_string_to_future_ticker[active_ticker]: chain for active_ticker, chain in active_ticker_to_chain_tickers_list.items() } future_ticker_to_chain_tickers_list = { future_ticker: [BloombergTicker(s['SECURITY_DES'], future_ticker.security_type, future_ticker.point_value) for s in specific_tickers_strings_list] for future_ticker, specific_tickers_strings_list in future_ticker_to_chain_tickers_list.items() } all_spec_tickers = [ticker for specific_tickers_list in future_ticker_to_chain_tickers_list.values() for ticker in specific_tickers_list] futures_expiration_dates = self.get_current_values(all_spec_tickers, expiration_date_fields).dropna(how="all") def specific_futures_index(future_ticker) -> pd.Index: """ Returns an Index of specific tickers for the given future ticker, which appeared in the futures expiration dates dataframe / series. """ specific_tickers_list = future_ticker_to_chain_tickers_list[future_ticker] return futures_expiration_dates.index.intersection(specific_tickers_list) ticker_to_future_expiration_dates = { future_ticker: futures_expiration_dates.loc[specific_futures_index(future_ticker)] for future_ticker in tickers } return ticker_to_future_expiration_dates def _get_index_members_and_weights(self, universe_ticker: BloombergTicker, date: Optional[datetime] = None, display_figi: bool = False) -> QFSeries: """Fetch paginated index constituents and their weights as a QFSeries.""" date = date or datetime.now() if date.date() != datetime.today().date(): raise ValueError(f"{self.__class__.__name__} does not provide historical tickers universe data") field = 'INDEX_MEMBERS_WEIGHTS' MAX_PAGE_NUMBER = 7 MAX_MEMBERS_PER_PAGE = 3000 universe = [] def str_to_bbg_ticker(identifier: str, figi: bool): ticker_str = f"/bbgid/{identifier}" if figi else f"{identifier} Equity" return BloombergTicker(ticker_str, SecurityType.STOCK, 1) def cast_weight_to_float(weight: str): try: return float(weight) except (ValueError, TypeError): return float("nan") for page_no in range(1, MAX_PAGE_NUMBER + 1): ticker_data = self.get_current_values(universe_ticker, field, overrides={ "DISPLAY_ID_BB_GLOBAL_OVERRIDE": "Y" if display_figi else "N", "PAGE_NUMBER_OVERRIDE": str(page_no) }) df = QFDataFrame(ticker_data) if len(ticker_data) > 0 else QFDataFrame( columns=["TICKER_AND_EXCH_CODE", "PERCENT_WEIGHT"]) df = df.set_index("TICKER_AND_EXCH_CODE").map(cast_weight_to_float) df.index = df.index.map(lambda s: str_to_bbg_ticker(s, display_figi)) universe.append(df) if len(df) < MAX_MEMBERS_PER_PAGE: break universe_series = pd.concat(universe).iloc[:, 0] return universe_series @staticmethod def _payload_hash(payload: dict) -> str: """Deterministic hash of request payload.""" return hashlib.md5(json.dumps(payload, sort_keys=True).encode('utf-8')).hexdigest() def _try_fetch_from_bloomberg(self, request_name: str): """ Try to fetch existing data from Bloomberg by request name prefix. Fetches content/responses and fieldList. Returns parsed data if both succeed else None. """ try: poll_url = urljoin(self.account_url, "content/responses/") response = self.session.get(poll_url, params={'prefix': request_name}) response_contains = response.json()['contains'] output = next(i for i in response_contains if i.get('metadata', {}).get('DL_REQUEST_ID') and 'key' in i) request_id = output['metadata']['DL_REQUEST_ID'] fields_url = urljoin(self.account_url, f'requests/{request_id}/fieldList') fields_response = self.session.get(fields_url) field_mapping = { field['mnemonic']: field['type'] for field in fields_response.json()['contains'] } return self._extract_response(output, request_id, field_mapping) except Exception as e: self.logger.debug(f"Failed to fetch existing data for request '{request_name}': {e}") return None def _submit_and_download(self, request_payload: dict): """Submit a request to the Bloomberg DL REST API, wait for an SSE delivery notification and download the result. Returns None if the request fails or times out.""" request_name = self._payload_hash(request_payload) request_payload["name"] = request_name if self._check_existing_first: result = self._try_fetch_from_bloomberg(request_name) if result is not None: self.logger.info(f"Retrieved data from existing request '{request_name}' (no new request).") return result sse_client = None try: if self._push_notification: # Open SSE connection before submitting the request sse_url = urljoin(self.HOST, '/eap/notifications/content/responses') sse_client = SSEClient(sse_url, self.session) requests_url = urljoin(self.account_url, 'requests/') response = self.session.post(requests_url, json=request_payload, raise_on_error=False) resp_json = response.json() # Handle HTTP errors from request submission status_code = getattr(response, 'status_code', None) if status_code is not None and status_code >= 400: self._log_request_submission_error(status_code, resp_json) return None server_request_id = resp_json["request"].get("identifier") # Obtain the {field: fields_group} mapping from Bloomberg, used to infer fields' types fields_url = urljoin(self.account_url, f'requests/{server_request_id}/fieldList') fields_response = self.session.get(fields_url) field_mapping = { field['mnemonic']: field['type'] for field in fields_response.json().get('contains', []) } expiration = datetime.now(self.TIMEZONE) + self.reply_timeout if self._push_notification: result = self._wait_for_sse_delivery(sse_client, server_request_id, field_mapping, expiration) else: result = self._wait_for_polling_delivery(request_name, server_request_id, field_mapping, expiration) return result except Exception as e: self.logger.warning(f"Bloomberg DL request submission failed: {e}. No data obtained.") return None finally: # Ensure cleanup always happens, even if an exception is raised if sse_client: sse_client.disconnect() def _wait_for_sse_delivery(self, sse_client, server_request_id, field_mapping, expiration): """Wait for Bloomberg data via SSE Push Notifications.""" while datetime.now(self.TIMEZONE) < expiration: event = sse_client.read_event() if event.is_heartbeat(): self.logger.debug('Received heartbeat event, keep waiting for events') continue self.logger.info(f'Received reply delivery notification event: {event}') output = json.loads(event.data) if not self._output_matches_request(output, server_request_id): self.logger.debug("Some other delivery occurred - continue waiting") continue sse_client.disconnect() return self._extract_response(output, server_request_id, field_mapping) self.logger.warning(f"SSE response for '{server_request_id}' timed out after {self.reply_timeout}.") return None def _wait_for_polling_delivery(self, request_name, server_request_id, field_mapping, expiration): """Wait for Bloomberg data via traditional REST API polling.""" poll_url = urljoin(self.account_url, "content/responses/") while datetime.now(self.TIMEZONE) < expiration: response = self.session.get(poll_url, params={ 'prefix': request_name, 'requestIdentifier': server_request_id, }) response_contains = response.json().get('contains', []) if not response_contains: self.logger.debug('Received empty data, polling in 30 seconds again') time.sleep(30) continue output = response_contains[0] if not self._output_matches_request(output, server_request_id): self.logger.debug("Some other delivery occurred - continue waiting") time.sleep(30) continue self.logger.info('Received delivery via polling') return self._extract_response(output, server_request_id, field_mapping) self.logger.warning(f"Polling response for '{server_request_id}' timed out after {self.reply_timeout}.") return None def _extract_response(self, output: dict, server_request_id: str, field_mapping: dict): output_key = output['key'] output_url = urljoin(self.HOST, f"{self.account_url}content/responses/{output_key}") self.logger.info(f"Response ready for '{server_request_id}' - downloading {output_key}") df = self._download_and_parse(output_url, server_request_id) request_type = output.get('metadata', {}).get('DL_REQUEST_TYPE', None) if request_type == 'HistoryRequest': return self.parser.get_history(df, field_mapping) elif request_type == 'DataRequest': return self.parser.get_current_values(df, field_mapping) else: raise NotImplementedError( "Currently only HistoryRequest and DataRequest are requests types " "are supported by the Bloomberg DL Data Provider.") def _log_request_submission_error(self, status_code: int, resp_json: dict): """Log a structured warning when a request submission returns an HTTP error.""" title = resp_json.get('title', resp_json.get('error', 'Unknown Error')) description = resp_json.get('description', resp_json.get('error_description', '')) errors = resp_json.get('errors', []) msg = f"Bloomberg DL request submission failed with HTTP {status_code} ({title}): {description}" if errors: msg += f" Errors: {errors}" msg += " No response data can be obtained for this request." self.logger.warning(msg) @staticmethod def _output_matches_request(output: dict, server_request_id: str) -> bool: metadata = output.get('metadata', {}) if server_request_id == metadata.get('DL_REQUEST_ID', ''): return True return False def _download_and_parse(self, output_url: str, request_id: str) -> DataFrame: """Download the response payload from output_url and parse it into a DataFrame.""" with self.session.get(output_url, stream=True) as response: raw_bytes = response.content if self.save_to_disk and self.downloads_path is not None: file_path = self.downloads_path / f"{request_id}.json" file_path.write_bytes(raw_bytes) self.logger.debug(f"Response saved to {file_path}") return read_json(raw_bytes.decode("utf-8")) def _build_identifier_dict(self, ticker_str: str, field_overrides: Optional[List[Dict]] = None) -> Optional[Dict]: """Build an Identifier dict for the request universe, or None if the ticker cannot be parsed.""" id_type, id_value = self._parse_identifier(ticker_str) if id_type is None: return None identifier = { '@type': 'Identifier', 'identifierType': id_type, 'identifierValue': id_value, } if field_overrides: identifier['fieldOverrides'] = field_overrides return identifier @staticmethod def _build_field_overrides(overrides: Optional[Dict[str, str]]) -> Optional[List[Dict]]: if not overrides: return None return [ {'@type': 'FieldOverride', 'mnemonic': mnemonic, 'override': value} for mnemonic, value in overrides.items() ] def _parse_identifier(self, identifier: str) -> Tuple[Optional[str], Optional[str]]: identifier = f"/ticker/{identifier}" if not identifier.startswith("/") else identifier match = re.match(r"^/(\w+)/(.+)", identifier) if not match: self.logger.error( f"Detected incorrect identifier: {identifier}. It will be removed from the data request.\n" f"In order to provide an identifier, which is not a ticker, please use " f"'/id_type/identifier' format, with id_type being one of the following: " f"{list(self._IDENTIFIER_TYPE_MAP.values())}") return None, None id_type_key, id_value = match.groups() mapped = self._IDENTIFIER_TYPE_MAP.get(id_type_key.lower()) if mapped is None: self.logger.error(f"Unknown identifier type '{id_type_key}' in '{identifier}'. " f"Valid types: {list(self._IDENTIFIER_TYPE_MAP.values())}") return None, None return mapped, id_value def _build_id_value_mapping(self, tickers: Sequence[BloombergTicker], actual_tickers: Sequence[BloombergTicker]) -> QFSeries: mapping = {} for original, actual in zip(tickers, actual_tickers): ticker_str = actual.as_string() _, id_value = self._parse_identifier(ticker_str) if id_value: mapping[id_value] = original mapping[ticker_str] = original return QFSeries([d for d in mapping.values()], index=[d for d in mapping.keys()]) def _set_terminal_identity(self, payload: dict): user = self._parse_terminal_identity_user(self._terminal_identity_user) sn = self._parse_terminal_identity_sn(self._terminal_identity_sn) if sn and user is not None: payload["terminalIdentity"] = { "@type": "BlpTerminalIdentity", "userNumber": user, "serialNumber": sn[0], "workStation": sn[1], } elif user is not None: payload["terminalIdentity"] = { "@type": "BbaTerminalIdentity", "userNumber": user, } def _discover_catalog_id(self) -> str: """Discover the scheduled catalog identifier from the /eap/catalogs/ endpoint.""" catalogs_url = urljoin(self.HOST, '/eap/catalogs/') response = self.session.get(catalogs_url) catalogs = response.json().get('contains', []) for catalog in catalogs: if catalog.get('subscriptionType') == 'scheduled': return catalog['identifier'] self.logger.error('Scheduled catalog not in %r', response.json().get('contains', [])) raise BloombergError('Scheduled catalog not found') def _get_settings_attribute(self, settings_obj, attribute: str): try: return getattr(settings_obj, attribute) except AttributeError: self.logger.warning(f"No {attribute} value found in the settings file, default (empty) value will be" f" used instead.") return None @staticmethod def _parse_terminal_identity_user(raw_value): if raw_value is None: return None try: return int(raw_value) except (ValueError, TypeError): raise ValueError(f"Invalid terminal user '{raw_value}'. " f"Obtain the User value by running IAM <GO> in Bloomberg terminal.") @staticmethod def _parse_terminal_identity_sn(raw_value): if raw_value is None: return None try: return [int(i) for i in str(raw_value).split("-")] except (ValueError, TypeError): raise ValueError(f"Invalid terminal S/N '{raw_value}'. " f"Obtain the S/N value by running IAM <GO> in Bloomberg terminal.")