Source code for qf_lib.data_providers.alpaca_py.alpaca_data_provider

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.
import itertools
import warnings
from datetime import datetime
from typing import Set, Type, Union, Sequence, Dict, Optional, List

from pandas import MultiIndex, concat, DataFrame

from qf_lib.common.enums.frequency import Frequency
from qf_lib.common.enums.price_field import PriceField
from qf_lib.common.enums.security_type import SecurityType
from qf_lib.common.tickers.tickers import Ticker, AlpacaTicker
from qf_lib.common.utils.dateutils.relative_delta import RelativeDelta
from qf_lib.common.utils.dateutils.timer import Timer
from qf_lib.common.utils.miscellaneous.to_list_conversion import convert_to_list
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.qf_data_array import QFDataArray
from qf_lib.containers.series.qf_series import QFSeries
from qf_lib.data_providers.abstract_price_data_provider import AbstractPriceDataProvider
from qf_lib.data_providers.helpers import normalize_data_array
from qf_lib.data_providers.alpaca_py.utilities import AlpacaDatesException

try:
    from alpaca.data import StockHistoricalDataClient, StockBarsRequest, TimeFrame, CryptoHistoricalDataClient, \
        CryptoBarsRequest

    is_alpaca_installed = True
except ImportError:
    is_alpaca_installed = False


[docs]class AlpacaDataProvider(AbstractPriceDataProvider): _security_type_to_request = {} _security_type_to_function = { SecurityType.STOCK: 'get_stock_bars', SecurityType.CRYPTO: 'get_crypto_bars', } security_type_to_client = {} def __init__(self, timer: Optional[Timer] = None, api_key: Optional[str] = None, secret_key: Optional[str] = None, oauth_token: Optional[str] = None, use_basic_auth: bool = False): """ Data provider using alpaca-py library to provide historical data for stocks and cryptocurrencies. Crypto data does not require authentication. Providing API keys will increase tbe rate limit. Parameters ----------- timer: Timer Might be either SettableTimer or RealTimer depending on the use case. If no parameter is passed, a default RealTimer object will be used. api_key: str Alpaca API key. Defaults to None. secret_key: str Alpaca API secret key. Defaults to None. oauth_token: str The oauth token if authenticating via OAuth. Defaults to None. use_basic_auth: bool If true, API requests will use basic authorization headers. """ super().__init__(timer) if not is_alpaca_installed: warnings.warn(f"alpaca-py ist not installed. If you would like to use {self.__class__.__name__} first" f" install the alpaca-py library.") exit(1) params = { "api_key": api_key, "secret_key": secret_key, "oauth_token": oauth_token, "use_basic_auth": use_basic_auth } self._security_type_to_request = { SecurityType.STOCK: StockBarsRequest, SecurityType.CRYPTO: CryptoBarsRequest } self.security_type_to_client[SecurityType.CRYPTO] = CryptoHistoricalDataClient(**params) try: self.security_type_to_client[SecurityType.STOCK] = StockHistoricalDataClient(**params) except Exception as e: warnings.warn(f"Stock Historical data will be unavailable due to the following error: {e}")
[docs] def price_field_to_str_map(self, *args) -> Dict[PriceField, str]: return { PriceField.Open: 'open', PriceField.High: 'high', PriceField.Low: 'low', PriceField.Close: 'close', PriceField.Volume: 'volume' }
[docs] def get_history(self, tickers: Union[Ticker, Sequence[Ticker]], fields: Union[None, str, Sequence[str]], start_date: datetime, end_date: datetime = None, frequency: Frequency = None, look_ahead_bias: bool = False, **kwargs) -> Union[QFSeries, QFDataFrame, QFDataArray]: """ Gets historical attributes (fields) of different securities (tickers). Parameters ---------- tickers: YFinanceTicker, Sequence[YFinanceTicker] tickers for securities which should be retrieved fields: None, str, Sequence[str] fields of securities which should be retrieved. start_date: datetime date representing the beginning of historical period from which data should be retrieved end_date: datetime date representing the end of historical period from which data should be retrieved; if no end_date was provided, by default the current date will be used. frequency: Frequency frequency of the data. This data provider supports Monthly, Weekly, Daily frequencies along with intraday frequencies at the following intervals: 60 and 1 minute. It is important to highlight that in order to match the behaviour of other data providers, in case of intraday frequency, the end_date bar is not included in the output. look_ahead_bias: bool if set to False, the look-ahead bias will be taken care of to make sure no future data is returned Returns ------- QFSeries, QFDataFrame, QFDataArray, float, str If possible the result will be squeezed, so that instead of returning a QFDataArray, data of lower dimensionality will be returned. The results will be either a QFDataArray (with 3 dimensions: date, ticker, field), a QFDataFrame (with 2 dimensions: date, ticker or field; it is also possible to get 2 dimensions ticker and field if single date was provided), a QFSeries (with 1 dimensions: date) or a float / str (in case if a single ticker, field and date were provided). If no data is available in the database or a non existing ticker was provided an empty structure (nan, QFSeries, QFDataFrame or QFDataArray) will be returned. """ frequency = frequency or self.frequency or Frequency.DAILY original_end_date = (end_date or self.timer.now()) + RelativeDelta(second=0, microsecond=0) end_date = original_end_date + RelativeDelta(hour=23, minute=59) if frequency <= Frequency.DAILY \ else original_end_date - frequency.time_delta() end_date = end_date if look_ahead_bias else self.get_end_date_without_look_ahead(end_date, frequency) start_date = self._adjust_start_date(start_date, frequency) got_single_date = self._got_single_date(start_date, original_end_date, frequency) tickers, got_single_ticker = convert_to_list(tickers, AlpacaTicker) fields, got_single_field = convert_to_list(fields, (PriceField, str)) _dfs = [] _tickers = sorted(tickers, key=lambda t: t.security_type) for sec_type, tickers_group in itertools.groupby(_tickers, lambda t: t.security_type): _dfs.append(self._request_data(sec_type, tickers_group, fields, start_date, end_date, frequency)) df = concat(_dfs, axis=1, ignore_index=False) df = df.reindex(columns=MultiIndex.from_product([[t.as_string() for t in tickers], fields], )) data_array = QFDataArray.create(df.index, tickers, fields, df.values.reshape(len(df.index), len(tickers), len(fields))) return normalize_data_array( data_array, tickers, fields, got_single_date, got_single_ticker, got_single_field, use_prices_types=False )
[docs] def supported_ticker_types(self) -> Set[Type[Ticker]]: return {AlpacaTicker}
@staticmethod def _frequency_to_timeframe(freq: Frequency): frequencies_mapping = { Frequency.MIN_1: TimeFrame.Minute, Frequency.MIN_60: TimeFrame.Hour, Frequency.DAILY: TimeFrame.Day, Frequency.WEEKLY: TimeFrame.Week, Frequency.MONTHLY: TimeFrame.Month, } try: return frequencies_mapping[freq] except KeyError: raise ValueError(f"Frequency must be one of the supported frequencies: {frequencies_mapping.keys()}.") \ from None def _request_data(self, sec_type: SecurityType, tickers: Sequence[AlpacaTicker], fields: List[str], start_date: datetime, end_date: datetime, frequency: Frequency): # Sort tickers based on the SecurityType tickers_str = [t.as_string() for t in tickers] try: # In case of intraday data, Alpaca returns a single bar when start date equals to end date. In order to # match the behaviour of other data providers if frequency > Frequency.DAILY and start_date > end_date: raise AlpacaDatesException() client = self.security_type_to_client[sec_type] request = self._security_type_to_request[sec_type] function = self._security_type_to_function[sec_type] df = getattr(client, function)(request( symbol_or_symbols=tickers_str, # check if duplicates should be removed timeframe=self._frequency_to_timeframe(frequency), start=start_date, end=end_date )).df.reindex(columns=fields) df = df.unstack(level=0) df.columns = df.columns.swaplevel(0, 1) if not df.empty: df.index = df.index.tz_convert(None).values if frequency > Frequency.DAILY else \ [d + RelativeDelta(hour=0, minute=0, second=0, microsecond=0) for d in df.index.tz_convert(None)] except (KeyError, AlpacaDatesException): df = DataFrame([], columns=fields) except Exception as e: self.logger.error(f"No data could be returned for the given parameters due to the following exception: {e}") df = DataFrame([], columns=fields) return df