turbo_broccoli.custom.pandas

pandas (de)serialization utilities.

  1"""pandas (de)serialization utilities."""
  2
  3import json
  4from io import StringIO
  5from typing import Any, Callable, Tuple
  6
  7import pandas as pd
  8
  9from ..context import Context
 10from ..exceptions import DeserializationError, TypeNotSupported
 11
 12
 13def _dataframe_to_json(df: pd.DataFrame, ctx: Context) -> dict:
 14    dtypes = [[str(k), v.name] for k, v in df.dtypes.items()]
 15    if df.memory_usage(deep=True).sum() <= ctx.min_artifact_size:
 16        return {
 17            "__type__": "pandas.dataframe",
 18            "__version__": 2,
 19            "data": json.loads(df.to_json(date_format="iso", date_unit="ns")),
 20            "dtypes": dtypes,
 21        }
 22    fmt = ctx.pandas_format
 23    path, name = ctx.new_artifact_path()
 24    getattr(df, f"to_{fmt}")(path, **ctx.pandas_kwargs)
 25    return {
 26        "__type__": "pandas.dataframe",
 27        "__version__": 2,
 28        "dtypes": dtypes,
 29        "id": name,
 30        "format": fmt,
 31    }
 32
 33
 34def _json_to_dataframe(dct: dict, ctx: Context) -> pd.DataFrame:
 35    decoders = {
 36        2: _json_to_dataframe_v2,
 37    }
 38    return decoders[dct["__version__"]](dct, ctx)
 39
 40
 41def _json_to_dataframe_v2(dct: dict, ctx: Context) -> pd.DataFrame:
 42    if "data" in dct:
 43        df = pd.read_json(StringIO(json.dumps(dct["data"])))
 44    else:
 45        fmt = dct["format"]
 46        path = ctx.id_to_artifact_path(dct["id"])
 47        if fmt in ["h5", "hdf"]:
 48            df = pd.read_hdf(path, "main")
 49        else:
 50            df = getattr(pd, f"read_{fmt}")(path)
 51    # Rename columns with non-string names
 52    # df.rename({str(d[0]): d[0] for d in dct["dtypes"]}, inplace=True)
 53    df = df.astype(
 54        {
 55            str(a): b
 56            for a, b in dct["dtypes"]
 57            if not str(b).startswith("datetime")
 58        }
 59    )
 60    for a, _ in filter(lambda x: x[1].startswith("datetime"), dct["dtypes"]):
 61        df[a] = pd.to_datetime(df[a]).dt.tz_localize(None)
 62    return df
 63
 64
 65def _json_to_series(dct: dict, ctx: Context) -> pd.Series:
 66    ctx.raise_if_nodecode("pandas.dataframe")
 67    decoders = {
 68        2: _json_to_series_v2,
 69    }
 70    return decoders[dct["__version__"]](dct, ctx)
 71
 72
 73def _json_to_series_v2(dct: dict, ctx: Context) -> pd.Series:
 74    return dct["data"][dct["name"]]
 75
 76
 77def _series_to_json(ser: pd.Series, ctx: Context) -> dict:
 78    name = ser.name if ser.name is not None else "main"
 79    return {
 80        "__type__": "pandas.series",
 81        "__version__": 2,
 82        "data": ser.to_frame(name=name),
 83        "name": name,
 84    }
 85
 86
 87def from_json(dct: dict, ctx: Context) -> Any:
 88    decoders = {
 89        "pandas.dataframe": _json_to_dataframe,
 90        "pandas.series": _json_to_series,
 91    }
 92    try:
 93        type_name = dct["__type__"]
 94        return decoders[type_name](dct, ctx)
 95    except KeyError as exc:
 96        raise DeserializationError() from exc
 97
 98
 99def to_json(obj: Any, ctx: Context) -> dict:
100    """
101    Serializes a pandas object into JSON by cases. See the README for the
102    precise list of supported types. The return dict has the following
103    structure:
104
105    - `pandas.DataFrame`: A dataframe is processed differently depending on its
106      size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is
107      small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in
108      the resulting JSON document as
109
110        ```py
111        {
112            "__type__": "pandas.dataframe",
113            "__version__": 2,
114            "data": {...},
115            "dtypes": [
116                [col1, dtype1],
117                [col2, dtype2],
118                ...,
119            ],
120        }
121        ```
122
123      where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict`
124      form). On the other hand, the dataframe is too large, then its content is
125      stored in an artifact, whose format follows the `TB_PANDAS_FORMAT`
126      environment (CSV by default). The resulting JSON document looks like
127
128        ```py
129        {
130            "__type__": "pandas.dataframe",
131            "__version__": 2,
132            "dtypes": [
133                [col1, dtype1],
134                [col2, dtype2],
135                ...
136            ],
137            "id": <UUID4 str>,
138            "format": <str>
139        }
140        ```
141
142    - `pandas.Series`: A series will be converted to a dataframe before being
143      serialized. The final document will look like this
144
145        ```py
146        {
147            "__type__": "pandas.series",
148            "__version__": 2,
149            "data": {...},
150            "name": <str>,
151        }
152        ```
153
154      where `{...}` is the document of the dataframe'd series, see above.
155
156    Warning:
157        Series and column names must be strings!
158
159    """
160    encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [
161        (pd.DataFrame, _dataframe_to_json),
162        (pd.Series, _series_to_json),
163    ]
164    for t, f in encoders:
165        if isinstance(obj, t):
166            return f(obj, ctx)
167    raise TypeNotSupported()
def from_json(dct: dict, ctx: turbo_broccoli.context.Context) -> Any:
88def from_json(dct: dict, ctx: Context) -> Any:
89    decoders = {
90        "pandas.dataframe": _json_to_dataframe,
91        "pandas.series": _json_to_series,
92    }
93    try:
94        type_name = dct["__type__"]
95        return decoders[type_name](dct, ctx)
96    except KeyError as exc:
97        raise DeserializationError() from exc
def to_json(obj: Any, ctx: turbo_broccoli.context.Context) -> dict:
100def to_json(obj: Any, ctx: Context) -> dict:
101    """
102    Serializes a pandas object into JSON by cases. See the README for the
103    precise list of supported types. The return dict has the following
104    structure:
105
106    - `pandas.DataFrame`: A dataframe is processed differently depending on its
107      size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is
108      small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in
109      the resulting JSON document as
110
111        ```py
112        {
113            "__type__": "pandas.dataframe",
114            "__version__": 2,
115            "data": {...},
116            "dtypes": [
117                [col1, dtype1],
118                [col2, dtype2],
119                ...,
120            ],
121        }
122        ```
123
124      where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict`
125      form). On the other hand, the dataframe is too large, then its content is
126      stored in an artifact, whose format follows the `TB_PANDAS_FORMAT`
127      environment (CSV by default). The resulting JSON document looks like
128
129        ```py
130        {
131            "__type__": "pandas.dataframe",
132            "__version__": 2,
133            "dtypes": [
134                [col1, dtype1],
135                [col2, dtype2],
136                ...
137            ],
138            "id": <UUID4 str>,
139            "format": <str>
140        }
141        ```
142
143    - `pandas.Series`: A series will be converted to a dataframe before being
144      serialized. The final document will look like this
145
146        ```py
147        {
148            "__type__": "pandas.series",
149            "__version__": 2,
150            "data": {...},
151            "name": <str>,
152        }
153        ```
154
155      where `{...}` is the document of the dataframe'd series, see above.
156
157    Warning:
158        Series and column names must be strings!
159
160    """
161    encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [
162        (pd.DataFrame, _dataframe_to_json),
163        (pd.Series, _series_to_json),
164    ]
165    for t, f in encoders:
166        if isinstance(obj, t):
167            return f(obj, ctx)
168    raise TypeNotSupported()

Serializes a pandas object into JSON by cases. See the README for the precise list of supported types. The return dict has the following structure:

  • pandas.DataFrame: A dataframe is processed differently depending on its size and on the TB_MAX_NBYTES environment variable. If the dataframe is small, i.e. at most TB_MAX_NBYTES bytes, then it is directly stored in the resulting JSON document as

    {
        "__type__": "pandas.dataframe",
        "__version__": 2,
        "data": {...},
        "dtypes": [
            [col1, dtype1],
            [col2, dtype2],
            ...,
        ],
    }
    

    where {...} is the result of pandas.DataFrame.to_json (in dict form). On the other hand, the dataframe is too large, then its content is stored in an artifact, whose format follows the TB_PANDAS_FORMAT environment (CSV by default). The resulting JSON document looks like

    {
        "__type__": "pandas.dataframe",
        "__version__": 2,
        "dtypes": [
            [col1, dtype1],
            [col2, dtype2],
            ...
        ],
        "id": <UUID4 str>,
        "format": <str>
    }
    

  • pandas.Series: A series will be converted to a dataframe before being serialized. The final document will look like this

    {
        "__type__": "pandas.series",
        "__version__": 2,
        "data": {...},
        "name": <str>,
    }
    

    where {...} is the document of the dataframe'd series, see above.

Warning: Series and column names must be strings!