-
Notifications
You must be signed in to change notification settings - Fork 0
/
streaming_csv_editor.py
261 lines (199 loc) · 8.32 KB
/
streaming_csv_editor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
from inspect import Signature, signature
from typing import Any, Callable, List, Tuple, Type, Union
from attr import define
from tqdm import tqdm
from typing_extensions import TypeAlias
from ..files import PathLike, count_lines
from . import CsvIterator
# Transformation function as actually used under the hood
_TransformationFunction: TypeAlias = Callable[[List[str]], List[str]]
class StreamingCsvEditor:
"""
Edit the content of a CSV with a specified transformation, line-by-line.
This class avoids loading the whole file into memory as would be done
with a pandas DataFrame.
"""
def __init__(
self,
columns_in: List[str],
columns_out: List[str],
transformation: Callable[..., Any],
line_terminator: str = "\n",
):
"""
Args:
columns_in: names for the columns acting as input for the transformation.
columns_out: names for the columns where to write the result of the
transformation.
transformation: function to call on the values from the input columns,
with the results being written to the output columns.
The function should be annotated, and the following are admissible:
- For the parameters:
- one or several strings
- a list of strings (with one or more elements)
- a tuple of strings (with one or more elements)
- For the return type:
- one string
- a list of strings (with one or more elements)
- a tuple of strings (with one or more elements)
line_terminator: line terminator to use for writing the CSV.
"""
self.transformation = _CsvTransformation(
columns_in=columns_in,
columns_out=columns_out,
fn=_callback_handler(transformation),
)
self.line_terminator = line_terminator
def process(self, csv_iterator: CsvIterator) -> CsvIterator:
"""
Process and edit a CSV file.
Args:
csv_iterator: Input CSV iterator.
Returns:
an edited instance of a CsvIterator.
"""
helper = _Helper(csv_iterator.columns, transformation=self.transformation)
return CsvIterator(
columns=helper.output_columns,
rows=(helper.process_line(row) for row in csv_iterator.rows),
)
def process_paths(
self, path_in: PathLike, path_out: PathLike, verbose: bool = False
) -> None:
"""
Process and edit a CSV file.
Args:
path_in: path to the existing CSV.
path_out: path to the edited CSV (to be saved).
verbose: whether to write the progress with tqdm.
"""
with open(path_in, "rt") as f_in, open(path_out, "wt") as f_out:
input_iterator = CsvIterator.from_stream(f_in)
if verbose:
row_count = count_lines(path_in)
input_iterator = CsvIterator(
input_iterator.columns,
rows=(row for row in tqdm(input_iterator.rows, total=row_count)),
)
output_iterator = self.process(input_iterator)
output_iterator.to_stream(f_out, line_terminator=self.line_terminator)
@define
class _CsvTransformation:
"""Helper class containing the details of a transformation for one CSV file."""
columns_in: List[str]
columns_out: List[str]
fn: _TransformationFunction
class _Helper:
"""Helper class that does the actual row-by-row processing."""
def __init__(
self,
input_columns: List[str],
transformation: _CsvTransformation,
):
self.fn = transformation.fn
self.indices_in = self._determine_column_indices(
input_columns, transformation.columns_in
)
new_columns = [c for c in transformation.columns_out if c not in input_columns]
self.n_new_columns = len(new_columns)
self.output_columns = input_columns + new_columns
self.indices_out = self._determine_column_indices(
self.output_columns, transformation.columns_out
)
def _determine_column_indices(
self, all_columns: List[str], target_columns: List[str]
) -> List[int]:
indices: List[int] = []
for c in target_columns:
try:
indices.append(all_columns.index(c))
except ValueError:
raise RuntimeError(f'"{c}" not found in {all_columns}.')
return indices
def process_line(self, row: List[str]) -> List[str]:
"""Process one line from the CSV.
Args:
row: content of one CSV line.
Returns:
Content of the line after applying the function
"""
# Process the values
input_items = [row[i] for i in self.indices_in]
results = self.fn(input_items)
# Extend the row object to make space for the new values (if needed)
row.extend("" for _ in range(self.n_new_columns))
# overwrite the results
for index, result in zip(self.indices_out, results):
row[index] = result
return row
def _parameter_is_tuple(parameter_type: Type[Any]) -> bool:
return any(v in str(parameter_type) for v in ["Tuple", "tuple"])
def _parameter_is_list(parameter_type: Type[Any]) -> bool:
return any(v in str(parameter_type) for v in ["List", "list"])
def _parameter_is_list_or_tuple(parameter_type: Type[Any]) -> bool:
return _parameter_is_list(parameter_type) or _parameter_is_tuple(parameter_type)
def _postprocessing_fn(fn: Callable[..., Any]) -> Callable[..., List[str]]:
"""From the user-given function, wrap it so that the result is converted
to a list of strings."""
sig = signature(fn)
return_type = sig.return_annotation
if return_type is Signature.empty:
raise ValueError(
"Make sure that the function you provided has a return annotation."
)
adapter: Callable[..., List[str]]
if return_type is str:
def adapter(x: str) -> List[str]:
return [x]
return adapter
if _parameter_is_list_or_tuple(return_type):
def adapter(x: Union[List[str], Tuple[str]]) -> List[str]:
return list(x)
return adapter
raise ValueError(f"Unsupported return type: {return_type}")
def _preprocessing_fn(fn: Callable[..., Any]) -> Callable[[List[str]], Any]:
"""From the user-given function, wrap it so that it can ingest a list of strings."""
sig = signature(fn)
parameter_types = [p.annotation for p in sig.parameters.values()]
if any(p is Signature.empty for p in parameter_types):
raise ValueError(
"Make sure that the function you provided is fully type-annotated."
)
# Necessary for the below
adapter: Callable[[List[str]], Any]
parameters_are_strs = all(p is str for p in parameter_types)
if parameters_are_strs:
def adapter(inputs: List[str]) -> Any:
return fn(*inputs)
return adapter
parameters_is_list = len(parameter_types) == 1 and _parameter_is_list(
parameter_types[0]
)
if parameters_is_list:
def adapter(inputs: List[str]) -> Any:
return fn(inputs)
return adapter
parameters_is_tuple = len(parameter_types) == 1 and _parameter_is_tuple(
parameter_types[0]
)
if parameters_is_tuple:
def adapter(inputs: List[str]) -> Any:
return fn(tuple(inputs))
return adapter
raise ValueError(
f"Cannot process parameter types of function with signature {sig}."
)
def _callback_handler(fn: Callable[..., Any]) -> _TransformationFunction:
"""From the user-provided callback, convert it to a function converting
a list of strings to a list of strings."""
sig = signature(fn)
parameter_types = [p.annotation for p in sig.parameters.values()]
if any(p is Signature.empty for p in parameter_types):
raise ValueError(
"Make sure that the function you provided is fully type-annotated."
)
postprocessing_fn = _postprocessing_fn(fn)
preprocessing_fn = _preprocessing_fn(fn)
def new_fn(inputs: List[str]) -> List[str]:
return postprocessing_fn(preprocessing_fn(inputs))
return new_fn