Source code for neuralib.tools.gspread

from pathlib import Path
from typing import Any, Literal, Union, get_args

import gspread
import numpy as np
import pandas as pd
import polars as pl
from neuralib.typing import DataFrame, PathLike
from neuralib.util.verbose import fprint

__all__ = [
    'SpreadSheetName',
    'WorkPageName',
    'DataIndex',
    #
    'GoogleSpreadSheet',
    'GoogleWorkSheet',
    #
    'upload_dataframe_to_spreadsheet'
]

SpreadSheetName = str
"""spreadsheet name"""

WorkPageName = str
"""workpage name of the spreadsheet"""

DataIndex = Union[None, int, str, slice, list[int], list[str], np.ndarray]
"""data index type"""

VALUE_RENDER_OPT = Literal['FORMATTED_VALUE', 'UNFORMATTED_VALUE', 'FORMULA']
"""value render option for the cell"""


[docs] class GoogleWorkSheet:
[docs] def __init__(self, worksheet: gspread.Worksheet, primary_key: str | tuple[str, ...] = 'Data'): """ :param worksheet: ``gspread.Worksheet`` :param primary_key: Primary key of the worksheet. If str type, it must be one of the column name. If tuple str type, the primary key is join using "_" per row """ self._worksheet = worksheet self._headers = tuple(self._worksheet.row_values(1)) if isinstance(primary_key, str) and primary_key not in self._headers: raise ValueError(f'col not found: {primary_key}') elif isinstance(primary_key, (str, tuple)): self.primary_key: str | tuple[str, ...] = primary_key else: raise TypeError(f'{primary_key}')
[docs] @classmethod def of(cls, name: SpreadSheetName, page: WorkPageName, service_account_path: PathLike, primary_key: str | tuple[str, ...] = 'Data') -> 'GoogleWorkSheet': """ Get a worksheet from spreadsheet :param name: ``SpreadSheetName`` :param page: ``WorkPageName`` :param service_account_path: The path to the service account json file :param primary_key: Primary key of the worksheet. If str type, it must be one of the column name. If tuple str type, the primary key is join using "_" per row :return: ``GoogleWorkSheet`` """ sh = GoogleSpreadSheet(name, service_account_path, primary_key) if page not in sh: raise ValueError(f'{page} not found in spreadsheet: {name}') return sh.get_worksheet(page)
@property def title(self) -> WorkPageName: """``WorkPageName``""" return self._worksheet.title @property def headers(self) -> list[str]: """list of worksheet header""" return list(self._headers) @property def primary_key_list(self) -> list[str]: """list of primary key of the worksheet""" primary = self.primary_key if isinstance(primary, str): return self.values(primary) elif isinstance(primary, tuple): ks = [self.values(p) for p in primary] if len(set(list(map(len, ks)))) != 1: print(set(list(map(len, ks)))) raise RuntimeError(f'primary key cannot join properly due to different len in col: {self.primary_key}') return [ '_'.join([str(it) for it in j]) for j in (list(zip(*ks, strict=True))) ] else: raise TypeError(f'{self.primary_key}')
[docs] def get_range_value(self, a1_range_notation: str) -> list[Any]: """get values from range notation. i.e., `B1:S1` to get the list of content. If get values from 2D, the return order would be first column-wise, then row-wise """ range_values = self._worksheet.range(a1_range_notation) return [it.value for it in range_values]
[docs] def values(self, head: str) -> list[Any]: """get list of value from header""" col = self._col(head) return list(self._worksheet.col_values(col)[1:])
def _row(self, data: DataIndex) -> int | list[int] | np.ndarray | None: """ Get row(s) index :param data: *str: first col value; *int: index :return: """ if data is None: return None if isinstance(data, int): return data + 2 # skip header row + one-base elif isinstance(data, str): return self.primary_key_list.index(data) + 2 elif isinstance(data, list): if len(data) == 0: return [] rows: list[int] = [] for it in data: row = self._row(it) if not isinstance(row, int): raise TypeError() rows.append(row) return rows elif isinstance(data, (slice, np.ndarray)): return np.arange(len(self.primary_key_list))[data] + 2 raise TypeError() def _col(self, head: str) -> int: """Get column index based on header (one-base)""" return self._headers.index(head) + 1 def _rowcol(self, data: DataIndex, head: str) -> tuple[int | list[int] | np.ndarray | None, int]: """Get row in col indices""" return self._row(data), self._col(head) # noinspection PyTypeChecker
[docs] def get_cell(self, data: DataIndex, head: str, value_render_option: VALUE_RENDER_OPT = 'FORMATTED_VALUE'): """ Get data from a cell :param data: ``DataIndex`` :param head: header name :param value_render_option: ``VALUE_RENDER_OPT``: {'FORMATTED_VALUE', 'UNFORMATTED_VALUE', 'FORMULA'} :return: """ from gspread.utils import rowcol_to_a1 if value_render_option not in get_args(VALUE_RENDER_OPT): raise ValueError('') row, col = self._rowcol(data, head) if row is None: return self._worksheet.col_values(col, value_render_option=value_render_option)[1:] # pyright: ignore[reportArgumentType] elif isinstance(row, int): return self._worksheet.cell(row, col, value_render_option=value_render_option).value # pyright: ignore[reportArgumentType] else: # single API call sheet_title = self._worksheet.title ranges = [f"'{sheet_title}'!{rowcol_to_a1(r, col)}" for r in row] batch_result = self._worksheet.spreadsheet.values_batch_get( ranges, params={'valueRenderOption': value_render_option} ) # {'valueRanges': [{'values': [[val]]}, ...]} return [ vr.get('values', [[None]])[0][0] if vr.get('values') else None for vr in batch_result.get('valueRanges', []) ]
[docs] def update_cell(self, data: DataIndex, head: str, value: list[str] | str): """ Update value in a cell :param data: ``DataIndex`` :param head: header name :param value: value to be updated. str type if single field :return: """ from gspread.utils import rowcol_to_a1 row, col = self._rowcol(data, head) if row is None: if len(value) != len(self.primary_key_list): raise ValueError() # Batch read old values first (single API call) old_values = self.values(head) batch_updates = [] for it, v in enumerate(value): old_val = old_values[it] if it < len(old_values) else None batch_updates.append({'range': rowcol_to_a1(it + 2, col), 'values': [[v]]}) fprint(f'UPDATES: {rowcol_to_a1(it + 2, col)} from {old_val} -> {v}', vtype='io') self._worksheet.batch_update(batch_updates) elif isinstance(row, int): if isinstance(value, list): raise TypeError('single cell update requires a scalar value') old_val = self.get_cell(data, head) self._worksheet.update_cell(row, col, value) fprint(f'UPDATES: {rowcol_to_a1(row, col)} from {old_val} -> {value}', vtype='io') else: if len(value) != len(row): raise ValueError() # Batch read old values first (pass original data to get_cell for batch read) old_values = self.get_cell(data, head) if not isinstance(old_values, list): old_values = [old_values] batch_updates = [] for idx, (it, v) in enumerate(zip(row, value, strict=False)): old_val = old_values[idx] if idx < len(old_values) else None batch_updates.append({'range': rowcol_to_a1(it, col), 'values': [[v]]}) fprint(f'UPDATES: {rowcol_to_a1(it, col)} from {old_val}-> {v}', vtype='io') self._worksheet.batch_update(batch_updates)
[docs] def clear(self): """Clears all cells in the worksheet""" self._worksheet.clear()
[docs] def update(self, range_name, values=None, **kwargs): """Sets values in a cell range of the sheet""" self._worksheet.update(range_name, values, **kwargs)
[docs] def to_pandas(self) -> pd.DataFrame: """Worksheet to pandas dataframe""" return pd.DataFrame(self._worksheet.get_all_records())
[docs] def to_polars(self) -> pl.DataFrame: """Worksheet to polar dataframe""" return pl.DataFrame(self._worksheet.get_all_records(), nan_to_null=True)
# =========== # # Spreadsheet # # =========== #
[docs] class GoogleSpreadSheet: """ Gspread module wrapper to access `google spreadsheet` .. seealso:: `<https://docs.gspread.org/en/latest/>`_ .. code-block:: python service_account_path = ... # The path to the service account json file sh = GoogleSpreadSheet('test_sheet', service_account_path) # get a worksheet from the spreadsheet ws = sh[WORK_SHEET_NAME] """
[docs] def __init__(self, name: SpreadSheetName, service_account_path: PathLike, primary_key: str | tuple[str, ...] = 'Data'): """ :param name: name of the spreadsheet :param service_account_path: The path to the service account json file :param primary_key: Primary key of the worksheet. If str type, it must be one of the column name. If tuple str type, the primary key is join using "_" per row """ self._client = gspread.service_account(filename=str(service_account_path)) self._sheet: gspread.Spreadsheet = self._client.open(name) self._worksheets: list[gspread.Worksheet] = self._sheet.worksheets() self._primary_key = primary_key
def __len__(self): """number of worksheet""" return len(self._worksheets) def __contains__(self, item: WorkPageName) -> bool: return self.has_worksheet(item) def __getitem__(self, item: int | WorkPageName) -> GoogleWorkSheet: """get the worksheet. if int type, get worksheet from index. if str type, get worksheet from ``WorkPageName`` """ if isinstance(item, int): return GoogleWorkSheet(self._worksheets[item]) elif isinstance(item, str): try: return self.get_worksheet(item) except ValueError: pass raise KeyError(f'{item} not found') else: raise TypeError(f'invalid item type: {type(item)}') @property def title(self) -> SpreadSheetName: """``SpreadSheetName``""" return self._sheet.title @property def worksheet_list(self) -> list[WorkPageName]: """list of ``WorkPageName``""" return [it.title for it in self._worksheets]
[docs] def has_worksheet(self, page: WorkPageName) -> bool: """If has the worksheet, implement also in ``__contains__()``""" for w in self._worksheets: if w.title == page: return True return False
[docs] def get_worksheet(self, page: WorkPageName) -> GoogleWorkSheet: """Get the worksheet. implement also in ``__get_item__()``""" for w in self._worksheets: if w.title == page: return GoogleWorkSheet(w, self._primary_key) raise ValueError(f'page not found: {page}')
# ========= # # Utilities # # ========= #
[docs] def upload_dataframe_to_spreadsheet(df: DataFrame, gspread_name: SpreadSheetName, worksheet_name: WorkPageName, service_account_path: Path | None, primary_key: str | tuple[str, ...] = 'Data') -> None: """ Upload a dataframe to a gspread worksheet :param df: polars or pandas DataFrame :param gspread_name: spreadsheet name :param worksheet_name: worksheet name under the spreadsheet :param service_account_path: The path to the service account json file. :param primary_key: Primary key of the worksheet. If str type, it must be one of the column name. If tuple str type, the primary key is join using "_" per row """ if service_account_path is None: raise ValueError('service_account_path is required') gs = GoogleSpreadSheet(gspread_name, service_account_path, primary_key) spreadsheet = gs._sheet if isinstance(df, pd.DataFrame): df = pl.from_pandas(df) if worksheet_name not in gs.worksheet_list: spreadsheet.add_worksheet(title=worksheet_name, rows=df.shape[0], cols=len(df.columns)) fprint(f'ADD WORKSHEET: {worksheet_name}') gs = GoogleSpreadSheet(gspread_name, service_account_path, primary_key) # refresh page worksheet = gs[worksheet_name] worksheet.clear() # cast dtype avoid serialization problem from gspread df = df.cast({pl.Datetime: pl.Utf8}) data = [df.columns] + [field for field in df.iter_rows()] worksheet.update(data)