from pathlib import Path
from typing import Any, Literal, Union, get_args
import gspread
import numpy as np
import pandas as pd
import polars as pl
from neuralib.typing import DataFrame, PathLike
from neuralib.util.verbose import fprint
__all__ = [
'SpreadSheetName',
'WorkPageName',
'DataIndex',
#
'GoogleSpreadSheet',
'GoogleWorkSheet',
#
'upload_dataframe_to_spreadsheet'
]
SpreadSheetName = str
"""spreadsheet name"""
WorkPageName = str
"""workpage name of the spreadsheet"""
DataIndex = Union[None, int, str, slice, list[int], list[str], np.ndarray]
"""data index type"""
VALUE_RENDER_OPT = Literal['FORMATTED_VALUE', 'UNFORMATTED_VALUE', 'FORMULA']
"""value render option for the cell"""
[docs]
class GoogleWorkSheet:
[docs]
def __init__(self, worksheet: gspread.Worksheet,
primary_key: str | tuple[str, ...] = 'Data'):
"""
:param worksheet: ``gspread.Worksheet``
:param primary_key: Primary key of the worksheet.
If str type, it must be one of the column name.
If tuple str type, the primary key is join using "_" per row
"""
self._worksheet = worksheet
self._headers = tuple(self._worksheet.row_values(1))
if isinstance(primary_key, str) and primary_key not in self._headers:
raise ValueError(f'col not found: {primary_key}')
elif isinstance(primary_key, (str, tuple)):
self.primary_key: str | tuple[str, ...] = primary_key
else:
raise TypeError(f'{primary_key}')
[docs]
@classmethod
def of(cls,
name: SpreadSheetName,
page: WorkPageName,
service_account_path: PathLike,
primary_key: str | tuple[str, ...] = 'Data') -> 'GoogleWorkSheet':
"""
Get a worksheet from spreadsheet
:param name: ``SpreadSheetName``
:param page: ``WorkPageName``
:param service_account_path: The path to the service account json file
:param primary_key: Primary key of the worksheet.
If str type, it must be one of the column name.
If tuple str type, the primary key is join using "_" per row
:return: ``GoogleWorkSheet``
"""
sh = GoogleSpreadSheet(name, service_account_path, primary_key)
if page not in sh:
raise ValueError(f'{page} not found in spreadsheet: {name}')
return sh.get_worksheet(page)
@property
def title(self) -> WorkPageName:
"""``WorkPageName``"""
return self._worksheet.title
@property
def headers(self) -> list[str]:
"""list of worksheet header"""
return list(self._headers)
@property
def primary_key_list(self) -> list[str]:
"""list of primary key of the worksheet"""
primary = self.primary_key
if isinstance(primary, str):
return self.values(primary)
elif isinstance(primary, tuple):
ks = [self.values(p) for p in primary]
if len(set(list(map(len, ks)))) != 1:
print(set(list(map(len, ks))))
raise RuntimeError(f'primary key cannot join properly due to different len in col: {self.primary_key}')
return [
'_'.join([str(it) for it in j])
for j in (list(zip(*ks, strict=True)))
]
else:
raise TypeError(f'{self.primary_key}')
[docs]
def get_range_value(self, a1_range_notation: str) -> list[Any]:
"""get values from range notation. i.e., `B1:S1` to get the list of content.
If get values from 2D, the return order would be first column-wise, then row-wise
"""
range_values = self._worksheet.range(a1_range_notation)
return [it.value for it in range_values]
[docs]
def values(self, head: str) -> list[Any]:
"""get list of value from header"""
col = self._col(head)
return list(self._worksheet.col_values(col)[1:])
def _row(self, data: DataIndex) -> int | list[int] | np.ndarray | None:
"""
Get row(s) index
:param data: *str: first col value; *int: index
:return:
"""
if data is None:
return None
if isinstance(data, int):
return data + 2 # skip header row + one-base
elif isinstance(data, str):
return self.primary_key_list.index(data) + 2
elif isinstance(data, list):
if len(data) == 0:
return []
rows: list[int] = []
for it in data:
row = self._row(it)
if not isinstance(row, int):
raise TypeError()
rows.append(row)
return rows
elif isinstance(data, (slice, np.ndarray)):
return np.arange(len(self.primary_key_list))[data] + 2
raise TypeError()
def _col(self, head: str) -> int:
"""Get column index based on header (one-base)"""
return self._headers.index(head) + 1
def _rowcol(self, data: DataIndex, head: str) -> tuple[int | list[int] | np.ndarray | None, int]:
"""Get row in col indices"""
return self._row(data), self._col(head)
# noinspection PyTypeChecker
[docs]
def get_cell(self, data: DataIndex,
head: str,
value_render_option: VALUE_RENDER_OPT = 'FORMATTED_VALUE'):
"""
Get data from a cell
:param data: ``DataIndex``
:param head: header name
:param value_render_option: ``VALUE_RENDER_OPT``: {'FORMATTED_VALUE', 'UNFORMATTED_VALUE', 'FORMULA'}
:return:
"""
from gspread.utils import rowcol_to_a1
if value_render_option not in get_args(VALUE_RENDER_OPT):
raise ValueError('')
row, col = self._rowcol(data, head)
if row is None:
return self._worksheet.col_values(col, value_render_option=value_render_option)[1:] # pyright: ignore[reportArgumentType]
elif isinstance(row, int):
return self._worksheet.cell(row, col, value_render_option=value_render_option).value # pyright: ignore[reportArgumentType]
else:
# single API call
sheet_title = self._worksheet.title
ranges = [f"'{sheet_title}'!{rowcol_to_a1(r, col)}" for r in row]
batch_result = self._worksheet.spreadsheet.values_batch_get(
ranges,
params={'valueRenderOption': value_render_option}
)
# {'valueRanges': [{'values': [[val]]}, ...]}
return [
vr.get('values', [[None]])[0][0] if vr.get('values') else None
for vr in batch_result.get('valueRanges', [])
]
[docs]
def update_cell(self, data: DataIndex,
head: str,
value: list[str] | str):
"""
Update value in a cell
:param data: ``DataIndex``
:param head: header name
:param value: value to be updated. str type if single field
:return:
"""
from gspread.utils import rowcol_to_a1
row, col = self._rowcol(data, head)
if row is None:
if len(value) != len(self.primary_key_list):
raise ValueError()
# Batch read old values first (single API call)
old_values = self.values(head)
batch_updates = []
for it, v in enumerate(value):
old_val = old_values[it] if it < len(old_values) else None
batch_updates.append({'range': rowcol_to_a1(it + 2, col), 'values': [[v]]})
fprint(f'UPDATES: {rowcol_to_a1(it + 2, col)} from {old_val} -> {v}', vtype='io')
self._worksheet.batch_update(batch_updates)
elif isinstance(row, int):
if isinstance(value, list):
raise TypeError('single cell update requires a scalar value')
old_val = self.get_cell(data, head)
self._worksheet.update_cell(row, col, value)
fprint(f'UPDATES: {rowcol_to_a1(row, col)} from {old_val} -> {value}', vtype='io')
else:
if len(value) != len(row):
raise ValueError()
# Batch read old values first (pass original data to get_cell for batch read)
old_values = self.get_cell(data, head)
if not isinstance(old_values, list):
old_values = [old_values]
batch_updates = []
for idx, (it, v) in enumerate(zip(row, value, strict=False)):
old_val = old_values[idx] if idx < len(old_values) else None
batch_updates.append({'range': rowcol_to_a1(it, col), 'values': [[v]]})
fprint(f'UPDATES: {rowcol_to_a1(it, col)} from {old_val}-> {v}', vtype='io')
self._worksheet.batch_update(batch_updates)
[docs]
def clear(self):
"""Clears all cells in the worksheet"""
self._worksheet.clear()
[docs]
def update(self, range_name, values=None, **kwargs):
"""Sets values in a cell range of the sheet"""
self._worksheet.update(range_name, values, **kwargs)
[docs]
def to_pandas(self) -> pd.DataFrame:
"""Worksheet to pandas dataframe"""
return pd.DataFrame(self._worksheet.get_all_records())
[docs]
def to_polars(self) -> pl.DataFrame:
"""Worksheet to polar dataframe"""
return pl.DataFrame(self._worksheet.get_all_records(), nan_to_null=True)
# =========== #
# Spreadsheet #
# =========== #
[docs]
class GoogleSpreadSheet:
"""
Gspread module wrapper to access `google spreadsheet`
.. seealso:: `<https://docs.gspread.org/en/latest/>`_
.. code-block:: python
service_account_path = ... # The path to the service account json file
sh = GoogleSpreadSheet('test_sheet', service_account_path)
# get a worksheet from the spreadsheet
ws = sh[WORK_SHEET_NAME]
"""
[docs]
def __init__(self, name: SpreadSheetName,
service_account_path: PathLike,
primary_key: str | tuple[str, ...] = 'Data'):
"""
:param name: name of the spreadsheet
:param service_account_path: The path to the service account json file
:param primary_key: Primary key of the worksheet.
If str type, it must be one of the column name.
If tuple str type, the primary key is join using "_" per row
"""
self._client = gspread.service_account(filename=str(service_account_path))
self._sheet: gspread.Spreadsheet = self._client.open(name)
self._worksheets: list[gspread.Worksheet] = self._sheet.worksheets()
self._primary_key = primary_key
def __len__(self):
"""number of worksheet"""
return len(self._worksheets)
def __contains__(self, item: WorkPageName) -> bool:
return self.has_worksheet(item)
def __getitem__(self, item: int | WorkPageName) -> GoogleWorkSheet:
"""get the worksheet.
if int type, get worksheet from index.
if str type, get worksheet from ``WorkPageName``
"""
if isinstance(item, int):
return GoogleWorkSheet(self._worksheets[item])
elif isinstance(item, str):
try:
return self.get_worksheet(item)
except ValueError:
pass
raise KeyError(f'{item} not found')
else:
raise TypeError(f'invalid item type: {type(item)}')
@property
def title(self) -> SpreadSheetName:
"""``SpreadSheetName``"""
return self._sheet.title
@property
def worksheet_list(self) -> list[WorkPageName]:
"""list of ``WorkPageName``"""
return [it.title for it in self._worksheets]
[docs]
def has_worksheet(self, page: WorkPageName) -> bool:
"""If has the worksheet, implement also in ``__contains__()``"""
for w in self._worksheets:
if w.title == page:
return True
return False
[docs]
def get_worksheet(self, page: WorkPageName) -> GoogleWorkSheet:
"""Get the worksheet. implement also in ``__get_item__()``"""
for w in self._worksheets:
if w.title == page:
return GoogleWorkSheet(w, self._primary_key)
raise ValueError(f'page not found: {page}')
# ========= #
# Utilities #
# ========= #
[docs]
def upload_dataframe_to_spreadsheet(df: DataFrame,
gspread_name: SpreadSheetName,
worksheet_name: WorkPageName,
service_account_path: Path | None,
primary_key: str | tuple[str, ...] = 'Data') -> None:
"""
Upload a dataframe to a gspread worksheet
:param df: polars or pandas DataFrame
:param gspread_name: spreadsheet name
:param worksheet_name: worksheet name under the spreadsheet
:param service_account_path: The path to the service account json file.
:param primary_key: Primary key of the worksheet.
If str type, it must be one of the column name.
If tuple str type, the primary key is join using "_" per row
"""
if service_account_path is None:
raise ValueError('service_account_path is required')
gs = GoogleSpreadSheet(gspread_name, service_account_path, primary_key)
spreadsheet = gs._sheet
if isinstance(df, pd.DataFrame):
df = pl.from_pandas(df)
if worksheet_name not in gs.worksheet_list:
spreadsheet.add_worksheet(title=worksheet_name, rows=df.shape[0], cols=len(df.columns))
fprint(f'ADD WORKSHEET: {worksheet_name}')
gs = GoogleSpreadSheet(gspread_name, service_account_path, primary_key) # refresh page
worksheet = gs[worksheet_name]
worksheet.clear()
# cast dtype avoid serialization problem from gspread
df = df.cast({pl.Datetime: pl.Utf8})
data = [df.columns] + [field for field in df.iter_rows()]
worksheet.update(data)