pyerrors/pyerrors/input/pandas.py
2026-04-20 19:47:41 +02:00

216 lines
7.4 KiB
Python

import gzip
import sqlite3
import warnings
from contextlib import closing
import numpy as np
import pandas as pd
from ..correlators import Corr
from ..obs import Obs
from .json import create_json_string, import_json_string
def to_sql(df, table_name, db, if_exists='fail', gz=True, **kwargs):
"""Write DataFrame including Obs or Corr valued columns to sqlite database.
Parameters
----------
df : pandas.DataFrame
Dataframe to be written to the database.
table_name : str
Name of the table in the database.
db : str
Path to the sqlite database.
if exists : str
How to behave if table already exists. Options 'fail', 'replace', 'append'.
gz : bool
If True the json strings are gzipped.
Returns
-------
None
"""
se_df = _serialize_df(df, gz=gz)
with closing(sqlite3.connect(db)) as con:
se_df.to_sql(table_name, con=con, if_exists=if_exists, index=False, **kwargs)
def read_sql(sql, db, auto_gamma=False, **kwargs):
"""Execute SQL query on sqlite database and obtain DataFrame including Obs or Corr valued columns.
Parameters
----------
sql : str
SQL query to be executed.
db : str
Path to the sqlite database.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
Returns
-------
data : pandas.DataFrame
Dataframe with the content of the sqlite database.
"""
with closing(sqlite3.connect(db)) as con:
extract_df = pd.read_sql(sql, con=con, **kwargs)
return _deserialize_df(extract_df, auto_gamma=auto_gamma)
def dump_df(df, fname, gz=True):
"""Exports a pandas DataFrame containing Obs valued columns to a (gzipped) csv file.
Before making use of pandas to_csv functionality Obs objects are serialized via the standardized
json format of pyerrors.
Parameters
----------
df : pandas.DataFrame
Dataframe to be dumped to a file.
fname : str
Filename of the output file.
gz : bool
If True, the output is a gzipped csv file. If False, the output is a csv file.
Returns
-------
None
"""
for column in df:
serialize = _need_to_serialize(df[column])
if not serialize:
if all(isinstance(entry, (int, np.integer, float, np.floating)) for entry in df[column]):
if any([np.isnan(entry) for entry in df[column]]):
warnings.warn("nan value in column " + column + " will be replaced by None", UserWarning, stacklevel=2)
out = _serialize_df(df, gz=False)
if not fname.endswith('.csv'):
fname += '.csv'
if gz is True:
if not fname.endswith('.gz'):
fname += '.gz'
out.to_csv(fname, index=False, compression='gzip')
else:
out.to_csv(fname, index=False)
def load_df(fname, auto_gamma=False, gz=True):
"""Imports a pandas DataFrame from a csv.(gz) file in which Obs objects are serialized as json strings.
Parameters
----------
fname : str
Filename of the input file.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
gz : bool
If True, assumes that data is gzipped. If False, assumes JSON file.
Returns
-------
data : pandas.DataFrame
Dataframe with the content of the sqlite database.
"""
if not fname.endswith('.csv') and not fname.endswith('.gz'):
fname += '.csv'
if gz is True:
if not fname.endswith('.gz'):
fname += '.gz'
with gzip.open(fname) as f:
re_import = pd.read_csv(f, keep_default_na=False)
else:
if fname.endswith('.gz'):
warnings.warn(f"Trying to read from {fname} without unzipping!", UserWarning, stacklevel=2)
re_import = pd.read_csv(fname, keep_default_na=False)
return _deserialize_df(re_import, auto_gamma=auto_gamma)
def _serialize_df(df, gz=False):
"""Serializes all Obs or Corr valued columns into json strings according to the pyerrors json specification.
Parameters
----------
df : pandas.DataFrame
DataFrame to be serilized.
gz: bool
gzip the json string representation. Default False.
"""
out = df.copy()
for column in out:
serialize = _need_to_serialize(out[column])
if serialize is True:
out[column] = out[column].transform(lambda x: create_json_string(x, indent=0) if not _is_null(x) else None)
if gz is True:
out[column] = out[column].transform(lambda x: gzip.compress(x.encode('utf-8')) if not _is_null(x) else gzip.compress(b''))
return out
def _deserialize_df(df, auto_gamma=False):
"""Deserializes all pyerrors json strings into Obs or Corr objects according to the pyerrors json specification.
Parameters
----------
df : pandas.DataFrame
DataFrame to be deserilized.
auto_gamma : bool
If True applies the gamma_method to all imported Obs objects with the default parameters for
the error analysis. Default False.
Notes:
------
In case any column of the DataFrame is gzipped it is gunzipped in the process.
"""
# In pandas 3+, string columns use 'str' dtype instead of 'object'
string_like_dtypes = ["object", "str"] if int(pd.__version__.split(".")[0]) >= 3 else ["object"]
for column in df.select_dtypes(include=string_like_dtypes):
if len(df[column]) == 0:
continue
if isinstance(df[column].iloc[0], bytes):
if df[column].iloc[0].startswith(b"\x1f\x8b\x08\x00"):
df[column] = df[column].transform(lambda x: gzip.decompress(x).decode('utf-8') if not pd.isna(x) else '')
if df[column].notna().any():
df[column] = df[column].replace({r'^$': None}, regex=True)
i = 0
while i < len(df[column]) and pd.isna(df[column].iloc[i]):
i += 1
if i < len(df[column]) and isinstance(df[column].iloc[i], str):
if '"program":' in df[column].iloc[i][:20]:
df[column] = df[column].transform(lambda x: import_json_string(x, verbose=False) if not pd.isna(x) else None)
if auto_gamma is True:
if isinstance(df[column].iloc[i], list):
df[column].apply(lambda x: [o.gm() if o is not None else x for o in x] if x is not None else x)
else:
df[column].apply(lambda x: x.gm() if x is not None else x)
# Convert NA values back to Python None for compatibility with `x is None` checks
if df[column].isna().any():
df[column] = df[column].astype(object).where(df[column].notna(), None)
return df
def _need_to_serialize(col):
serialize = False
i = 0
while i < len(col) and _is_null(col.iloc[i]):
i += 1
if i == len(col):
return serialize
if isinstance(col.iloc[i], (Obs, Corr)):
serialize = True
elif isinstance(col.iloc[i], list):
if all(isinstance(o, Obs) for o in col.iloc[i]):
serialize = True
return serialize
def _is_null(val):
"""Check if a value is null (None or NA), handling list/array values."""
return False if isinstance(val, (list, np.ndarray)) else pd.isna(val)