from io import StringIO, TextIOBase
from pathlib import Path
from types import TracebackType
from neuralib.typing import PathLike
__all__ = ['csv_header']
class CsvContextManager:
def __init__(self,
output: PathLike | None,
header: list[str],
append: bool = False,
buffer: bool = False,
continuous_mode: str | None = None,
quotes_header: list[str] | str | None = None):
self._output = Path(output) if output is not None else None
self._header = header
self._append = append
self._buffer = buffer
self._cont_mode = continuous_mode
self._stream: TextIOBase | StringIO | None = None
self._cont_column: set[str] | None = None
self._cont_index = -1
if isinstance(quotes_header, str):
quotes_header = [quotes_header]
self._quotes_header: list[str] | None = quotes_header
def __enter__(self) -> 'CsvContextManager':
if self._stream is not None:
raise RuntimeError()
if self._quotes_header is not None:
if not all([it in self._header for it in self._quotes_header]):
raise RuntimeError('')
self._cont_column = None
self._cont_index = -1
if self._output is not None:
if self._cont_mode is not None and self._cont_mode in self._header and self._append:
self._cont_column = set()
self._cont_index = self._header.index(self._cont_mode)
if self._output.exists():
with self._output.open('r') as f:
for line in f:
try:
p = line.strip().split(',')[self._cont_index]
except IndexError:
pass
else:
self._cont_column.add(p)
print_header = not self._output.exists() or not self._append
if self._buffer:
from io import StringIO
self._stream = StringIO()
else:
mode = 'a' if self._append else 'w'
self._stream = self._output.open(mode)
if print_header:
print(*self._header, sep=',', file=self._stream, flush=True)
return self
def __exit__(self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None) -> None:
if self._output is None:
return
if self._buffer:
if exc_type is None:
if not isinstance(self._stream, StringIO):
raise RuntimeError('buffer stream was not initialized')
mode = 'a' if self._append else 'w'
with self._output.open(mode) as out:
out.write(self._stream.getvalue())
else:
print('error happen, drop buffer')
if self._stream is None:
raise RuntimeError('csv stream was not initialized')
self._stream.close()
self._stream = None
def __call__(self, *args) -> None:
if self._output is None:
return
if self._stream is None:
raise RuntimeError('csv stream was not initialized')
if self._quotes_header is not None:
ret = list([*args])
header_idx = [self._header.index(q) for q in self._quotes_header]
for i in header_idx:
ret[i] = f'\"{ret[i]}\"'
args = tuple(ret)
print(*args, sep=',', file=self._stream, flush=True)
if self._cont_column is not None:
self._cont_column.add(str(args[self._cont_index]))
def __contains__(self, item) -> bool:
if self._cont_column is None:
return False
return str(item) in self._cont_column
def __repr__(self):
return str(self._output)