turbo_broccoli.custom.pandas
pandas (de)serialization utilities.
1"""pandas (de)serialization utilities.""" 2 3import json 4from io import StringIO 5from typing import Any, Callable, Tuple 6 7import pandas as pd 8 9from ..context import Context 10from ..exceptions import DeserializationError, TypeNotSupported 11 12 13def _dataframe_to_json(df: pd.DataFrame, ctx: Context) -> dict: 14 dtypes = [[str(k), v.name] for k, v in df.dtypes.items()] 15 if df.memory_usage(deep=True).sum() <= ctx.min_artifact_size: 16 return { 17 "__type__": "pandas.dataframe", 18 "__version__": 2, 19 "data": json.loads(df.to_json(date_format="iso", date_unit="ns")), 20 "dtypes": dtypes, 21 } 22 fmt = ctx.pandas_format 23 path, name = ctx.new_artifact_path() 24 getattr(df, f"to_{fmt}")(path, **ctx.pandas_kwargs) 25 return { 26 "__type__": "pandas.dataframe", 27 "__version__": 2, 28 "dtypes": dtypes, 29 "id": name, 30 "format": fmt, 31 } 32 33 34def _json_to_dataframe(dct: dict, ctx: Context) -> pd.DataFrame: 35 decoders = { 36 2: _json_to_dataframe_v2, 37 } 38 return decoders[dct["__version__"]](dct, ctx) 39 40 41def _json_to_dataframe_v2(dct: dict, ctx: Context) -> pd.DataFrame: 42 if "data" in dct: 43 df = pd.read_json(StringIO(json.dumps(dct["data"]))) 44 else: 45 fmt = dct["format"] 46 path = ctx.id_to_artifact_path(dct["id"]) 47 if fmt in ["h5", "hdf"]: 48 df = pd.read_hdf(path, "main") 49 else: 50 df = getattr(pd, f"read_{fmt}")(path) 51 # Rename columns with non-string names 52 # df.rename({str(d[0]): d[0] for d in dct["dtypes"]}, inplace=True) 53 df = df.astype( 54 { 55 str(a): b 56 for a, b in dct["dtypes"] 57 if not str(b).startswith("datetime") 58 } 59 ) 60 for a, _ in filter(lambda x: x[1].startswith("datetime"), dct["dtypes"]): 61 df[a] = pd.to_datetime(df[a]).dt.tz_localize(None) 62 return df 63 64 65def _json_to_series(dct: dict, ctx: Context) -> pd.Series: 66 ctx.raise_if_nodecode("pandas.dataframe") 67 decoders = { 68 2: _json_to_series_v2, 69 } 70 return decoders[dct["__version__"]](dct, ctx) 71 72 73def _json_to_series_v2(dct: dict, ctx: Context) -> pd.Series: 74 return dct["data"][dct["name"]] 75 76 77def _series_to_json(ser: pd.Series, ctx: Context) -> dict: 78 name = ser.name if ser.name is not None else "main" 79 return { 80 "__type__": "pandas.series", 81 "__version__": 2, 82 "data": ser.to_frame(name=name), 83 "name": name, 84 } 85 86 87def from_json(dct: dict, ctx: Context) -> Any: 88 decoders = { 89 "pandas.dataframe": _json_to_dataframe, 90 "pandas.series": _json_to_series, 91 } 92 try: 93 type_name = dct["__type__"] 94 return decoders[type_name](dct, ctx) 95 except KeyError as exc: 96 raise DeserializationError() from exc 97 98 99def to_json(obj: Any, ctx: Context) -> dict: 100 """ 101 Serializes a pandas object into JSON by cases. See the README for the 102 precise list of supported types. The return dict has the following 103 structure: 104 105 - `pandas.DataFrame`: A dataframe is processed differently depending on its 106 size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is 107 small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in 108 the resulting JSON document as 109 110 ```py 111 { 112 "__type__": "pandas.dataframe", 113 "__version__": 2, 114 "data": {...}, 115 "dtypes": [ 116 [col1, dtype1], 117 [col2, dtype2], 118 ..., 119 ], 120 } 121 ``` 122 123 where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict` 124 form). On the other hand, the dataframe is too large, then its content is 125 stored in an artifact, whose format follows the `TB_PANDAS_FORMAT` 126 environment (CSV by default). The resulting JSON document looks like 127 128 ```py 129 { 130 "__type__": "pandas.dataframe", 131 "__version__": 2, 132 "dtypes": [ 133 [col1, dtype1], 134 [col2, dtype2], 135 ... 136 ], 137 "id": <UUID4 str>, 138 "format": <str> 139 } 140 ``` 141 142 - `pandas.Series`: A series will be converted to a dataframe before being 143 serialized. The final document will look like this 144 145 ```py 146 { 147 "__type__": "pandas.series", 148 "__version__": 2, 149 "data": {...}, 150 "name": <str>, 151 } 152 ``` 153 154 where `{...}` is the document of the dataframe'd series, see above. 155 156 Warning: 157 Series and column names must be strings! 158 159 """ 160 encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [ 161 (pd.DataFrame, _dataframe_to_json), 162 (pd.Series, _series_to_json), 163 ] 164 for t, f in encoders: 165 if isinstance(obj, t): 166 return f(obj, ctx) 167 raise TypeNotSupported()
100def to_json(obj: Any, ctx: Context) -> dict: 101 """ 102 Serializes a pandas object into JSON by cases. See the README for the 103 precise list of supported types. The return dict has the following 104 structure: 105 106 - `pandas.DataFrame`: A dataframe is processed differently depending on its 107 size and on the `TB_MAX_NBYTES` environment variable. If the dataframe is 108 small, i.e. at most `TB_MAX_NBYTES` bytes, then it is directly stored in 109 the resulting JSON document as 110 111 ```py 112 { 113 "__type__": "pandas.dataframe", 114 "__version__": 2, 115 "data": {...}, 116 "dtypes": [ 117 [col1, dtype1], 118 [col2, dtype2], 119 ..., 120 ], 121 } 122 ``` 123 124 where `{...}` is the result of `pandas.DataFrame.to_json` (in `dict` 125 form). On the other hand, the dataframe is too large, then its content is 126 stored in an artifact, whose format follows the `TB_PANDAS_FORMAT` 127 environment (CSV by default). The resulting JSON document looks like 128 129 ```py 130 { 131 "__type__": "pandas.dataframe", 132 "__version__": 2, 133 "dtypes": [ 134 [col1, dtype1], 135 [col2, dtype2], 136 ... 137 ], 138 "id": <UUID4 str>, 139 "format": <str> 140 } 141 ``` 142 143 - `pandas.Series`: A series will be converted to a dataframe before being 144 serialized. The final document will look like this 145 146 ```py 147 { 148 "__type__": "pandas.series", 149 "__version__": 2, 150 "data": {...}, 151 "name": <str>, 152 } 153 ``` 154 155 where `{...}` is the document of the dataframe'd series, see above. 156 157 Warning: 158 Series and column names must be strings! 159 160 """ 161 encoders: list[Tuple[type, Callable[[Any, Context], dict]]] = [ 162 (pd.DataFrame, _dataframe_to_json), 163 (pd.Series, _series_to_json), 164 ] 165 for t, f in encoders: 166 if isinstance(obj, t): 167 return f(obj, ctx) 168 raise TypeNotSupported()
Serializes a pandas object into JSON by cases. See the README for the precise list of supported types. The return dict has the following structure:
pandas.DataFrame
: A dataframe is processed differently depending on its size and on theTB_MAX_NBYTES
environment variable. If the dataframe is small, i.e. at mostTB_MAX_NBYTES
bytes, then it is directly stored in the resulting JSON document as{ "__type__": "pandas.dataframe", "__version__": 2, "data": {...}, "dtypes": [ [col1, dtype1], [col2, dtype2], ..., ], }
where
{...}
is the result ofpandas.DataFrame.to_json
(indict
form). On the other hand, the dataframe is too large, then its content is stored in an artifact, whose format follows theTB_PANDAS_FORMAT
environment (CSV by default). The resulting JSON document looks like{ "__type__": "pandas.dataframe", "__version__": 2, "dtypes": [ [col1, dtype1], [col2, dtype2], ... ], "id": <UUID4 str>, "format": <str> }
pandas.Series
: A series will be converted to a dataframe before being serialized. The final document will look like this{ "__type__": "pandas.series", "__version__": 2, "data": {...}, "name": <str>, }
where
{...}
is the document of the dataframe'd series, see above.
Warning: Series and column names must be strings!