Source code for qf_lib.documents_utils.excel.excel_importer

#     Copyright 2016-present CERN – European Organization for Nuclear Research
#
#     Licensed under the Apache License, Version 2.0 (the "License");
#     you may not use this file except in compliance with the License.
#     You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#     Unless required by applicable law or agreed to in writing, software
#     distributed under the License is distributed on an "AS IS" BASIS,
#     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#     See the License for the specific language governing permissions and
#     limitations under the License.

import datetime
import io
from itertools import islice
from os.path import exists
from typing import Union

import numpy as np
from openpyxl import load_workbook

from qf_lib.common.utils.logging.qf_parent_logger import qf_logger
from qf_lib.containers.dataframe.qf_dataframe import QFDataFrame
from qf_lib.containers.series.qf_series import QFSeries
from qf_lib.documents_utils.excel.helpers import get_bounding_box


[docs]class ExcelImporter: """ Class used for importing Series and DataFrames from the Excel files. """ def __init__(self): self.logger = qf_logger.getChild(self.__class__.__name__)
[docs] def import_cell( self, file_path: str, cell_address: str, sheet_name: str = None) -> Union[int, float, str]: """ Imports a container of given type (e.g. Series/DataFrame) from the Excel file of a given name. Parameters ---------- file_path path to the file containing the data to be imported cell_address address of the cell that you want to get (e.g. 'A1') sheet_name the name of the sheet from which the container should be imported. If no name is given, the active worksheet is used. Returns ------- container object containing the imported value """ self.logger.info("Started importing data from {}".format(file_path)) work_book = self._get_work_book(file_path) work_sheet = self._get_work_sheet(work_book, sheet_name) result = work_sheet[cell_address] work_book.close() return result.value
[docs] def import_container( self, file_path: str, starting_cell: str, ending_cell: str, container_type: type = None, sheet_name: str = None, include_index: bool = True, include_column_names: bool = False) \ -> Union[QFSeries, QFDataFrame]: """ Imports a container of given type (e.g. Series/DataFrame) from the Excel file of a given name. Parameters ---------- file_path: path to the file containing the data to be imported starting_cell top left corner of the imported container (e.g. A1) ending_cell bottom right corner of the imported container (e.g. B10) container_type type of the container to import. If none is given, then it is inferred from the bounding box (Series, if there is a single column, DataFrame for multiple columns). Other custom series and dataframe types that extend the Series and DataFrame types can also be used, this includes QFSeries and QFDataFrame. sheet_name the name of the sheet from which the container should be imported. If no name is given, the active worksheet is used. include_index if True than it is assumed that index is placed in the first column while values are starting from the 2nd column include_column_names determines whether the first row in the specified container contains the column names. Returns ------- container object containing the imported data """ self.logger.info("Started importing data from {}".format(file_path)) start_time = datetime.datetime.now() work_book = self._get_work_book(file_path) work_sheet = self._get_work_sheet(work_book, sheet_name) bounding_box = get_bounding_box(starting_cell, ending_cell) nr_of_non_index_columns = bounding_box.ending_column - bounding_box.starting_column + 1 if include_index: nr_of_non_index_columns -= 1 # one column for index, thus -1 if container_type is None: container_type = self._infer_container_type(nr_of_non_index_columns) if not self._is_correct_containers_type(container_type, nr_of_non_index_columns): raise ValueError("Incorrect container's type") container = self._load_container(work_sheet, container_type, bounding_box, include_index, include_column_names) end_time = datetime.datetime.now() execution_time = end_time - start_time self.logger.info("Ended importing data from {} after {}".format(file_path, execution_time)) work_book.close() return container.squeeze()
def _get_work_book(self, file_path): assert exists(file_path) with open(file_path, "rb") as f: in_memory_file = io.BytesIO(f.read()) work_book = load_workbook(in_memory_file, read_only=True, data_only=True) return work_book def _get_work_sheet(self, work_book, sheet_name): if sheet_name is None: work_sheet = work_book.active else: work_sheet = work_book[sheet_name] return work_sheet def _infer_container_type(self, nr_of_non_index_columns): if nr_of_non_index_columns <= 0: raise ValueError("Ending column must have higher index than starting column and if the include_index==True," "then there must be at least two columns in the bounding box") container_type = None if nr_of_non_index_columns > 1: container_type = QFDataFrame elif nr_of_non_index_columns == 1: container_type = QFSeries return container_type def _is_correct_containers_type(self, container_type, nr_of_non_index_columns): if nr_of_non_index_columns > 1: correct_container_type = issubclass(container_type, QFDataFrame) else: correct_container_type = issubclass(container_type, QFSeries) return correct_container_type def _load_container(self, work_sheet, container_type, bounding_box, include_index, include_column_names): container = None if issubclass(container_type, QFSeries): container = self._load_series(work_sheet, container_type, bounding_box, include_index, include_column_names) elif issubclass(container_type, QFDataFrame): container = self._load_dataframe( work_sheet, container_type, bounding_box, include_index, include_column_names) return container def _load_series(self, work_sheet, container_type, bounding_box, include_index, include_column_names): starting_column = bounding_box.starting_column starting_row = bounding_box.starting_row ending_row = bounding_box.ending_row if include_column_names: # Ignore the column names in this case. starting_row += 1 index = None values_column = starting_column if include_index: index = self._load_column(work_sheet, starting_row, ending_row, starting_column) values_column += 1 values = self._load_column(work_sheet, starting_row, ending_row, values_column) return container_type(data=values, index=index) def _load_dataframe(self, work_sheet, container_type, bounding_box, include_index, include_column_names): starting_column = bounding_box.starting_column starting_row = bounding_box.starting_row ending_column = bounding_box.ending_column ending_row = bounding_box.ending_row # Read the column names at the top of the dataframe. column_names = None if include_column_names: column_names = self._load_row(work_sheet, starting_row) column_names = column_names[starting_column - 1:ending_column] starting_row += 1 index = None if include_index: index = self._load_column(work_sheet, starting_row, ending_row, starting_column) if column_names is not None: column_names = column_names[1:] starting_column += 1 rows_values = [] for row in islice(work_sheet.rows, starting_row - 1, ending_row): row_values = [cell.value for cell in islice(row, starting_column - 1, ending_column)] rows_values.append(row_values) values = np.array(rows_values) return container_type(index=index, data=values, columns=column_names) def _load_column(self, work_sheet, starting_row, ending_row, column_nr): values = [] row_nr = 1 for row in work_sheet.rows: if starting_row <= row_nr <= ending_row: col_nr = 1 for cell in row: if col_nr == column_nr: values.append(cell.value) col_nr += 1 row_nr += 1 return values def _load_row(self, work_sheet, row_index): for i, row in enumerate(work_sheet.rows): if i == row_index - 1: return [cell.value for cell in row] return None