-
Notifications
You must be signed in to change notification settings - Fork 0
/
files.py
323 lines (250 loc) · 10.4 KB
/
files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
import errno
import os
import random
import shutil
import sys
import tempfile
from contextlib import ExitStack, contextmanager
from pathlib import Path
from typing import Iterable, Iterator, List, Tuple, Union
from typing_extensions import TypeAlias
from .basic import temporary_random_seed
from .containers import all_identical
PathLike: TypeAlias = Union[str, os.PathLike]
def load_list_from_file(filename: PathLike) -> List[str]:
return list(iterate_lines_from_file(filename))
def iterate_lines_from_file(filename: PathLike) -> Iterator[str]:
with open(filename, "rt") as f:
for line in f:
yield line.rstrip("\r\n")
def dump_list_to_file(values: Iterable[str], filename: PathLike) -> None:
"""Write an iterable of strings to a file.
Args:
values: values to write to the file.
filename: file to write to. Will be overwritten if it exists already.
"""
with open(filename, "wt") as f:
for v in values:
f.write(f"{v}\n")
def append_to_file(values: Iterable[str], filename: PathLike) -> None:
"""Append an iterable of strings to a file.
Args:
values: values to append to the file.
filename: file to append to.
"""
with open(filename, "at") as f:
for v in values:
f.write(f"{v}\n")
def count_lines(filename: PathLike) -> int:
return sum(1 for _ in open(filename))
def iterate_tuples_from_files(
filenames: List[PathLike],
) -> Iterator[Tuple[str, ...]]:
"""
Read from several files at once, and put the values from the same lines numbers
into tuples.
Args:
filenames: files to read.
Returns:
iterator over the generated tuples.
"""
# Make sure the files have the same lengths. This is not the optimal solution
# and in principle, one could detect unequal lengths when reading the files.
# However, an easy solution is available only from Python 3.10:
# https://stackoverflow.com/q/32954486
if not all_identical([count_lines(file) for file in filenames]):
raise ValueError("Not all the files have identical lengths")
# Opening several files at once;
# See https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack
with ExitStack() as stack:
files = [stack.enter_context(open(fname, "rt")) for fname in filenames]
iterators = [(line.rstrip("\r\n") for line in f) for f in files]
yield from zip(*iterators)
def dump_tuples_to_files(
values: Iterable[Tuple[str, ...]], filenames: List[PathLike]
) -> None:
"""Write tuples to multiple files (1st tuple value ends up in 1st file, etc.).
Args:
values: tuples to write to files.
filenames: files to create.
"""
# Opening several files at once;
# See https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack
with ExitStack() as stack:
files = [stack.enter_context(open(fname, "wt")) for fname in filenames]
number_files = len(files)
for value_tuple in values:
if len(value_tuple) != number_files:
raise ValueError(
f"Tuple {value_tuple} has incorrect size (expected: {number_files})."
)
for value, f in zip(value_tuple, files):
f.write(f"{value}\n")
def stable_shuffle(
input_file: PathLike, output_file: PathLike, seed: int, is_csv: bool = False
) -> None:
"""
Shuffle a file in a deterministic order (the same seed always reorders
files of the same number of lines identically).
Useful, as an example, to shuffle a source and target files identically.
Args:
input_file: file to shuffle.
output_file: where to save the shuffled file.
is_csv: if True, the first line will not be shuffled.
"""
# Note we use the context manager to avoid side effects of setting the seed.
with temporary_random_seed(seed):
line_iterator = iterate_lines_from_file(input_file)
# Get the header, if it's a CSV. We store it as a list, which will have 0 or 1 element.
header = []
if is_csv:
header = [next(line_iterator)]
# Get actual content and shuffle it
lines = list(line_iterator)
random.shuffle(lines)
# Write header (if there is no header, it will empty the file)
dump_list_to_file(header, output_file)
# Write the shuffled lines
append_to_file(lines, output_file)
@contextmanager
def named_temporary_path(delete: bool = True) -> Iterator[Path]:
"""
Get the path for a temporary file or directory, without creating it (can
be especially useful in tests).
This is similar to tempfile.NamedTemporaryFile, when the file is not
to be actually opened, and one is just interested in obtaining a writable /
readable path to optionally delete at the end of the context.
This function was originally created to bypass a limitation of NamedTemporaryFile
on Windows (https://stackoverflow.com/q/23212435), which becomes relevant when
one does not want the file to be opened automatically. The solution is
inspired by https://stackoverflow.com/a/58955530.
Args:
delete: whether to delete the file when exiting the context
Examples:
>>> with named_temporary_path() as temporary_path:
... # do something on the temporary path.
... # The file or directory at that path will be deleted at the
... # end of the context, except if delete=False.
"""
base_temp_dir = Path(tempfile.gettempdir())
temporary_path = base_temp_dir / os.urandom(24).hex()
try:
yield temporary_path
finally:
if delete and temporary_path.exists():
if temporary_path.is_file():
temporary_path.unlink()
else:
shutil.rmtree(temporary_path)
@contextmanager
def named_temporary_directory(delete: bool = True) -> Iterator[Path]:
"""
Get the path for a temporary directory and create it.
Relies on ``named_temporary_path`` to provide a context manager that will
automatically delete the directory when leaving the context.
Args:
delete: whether to delete the file when exiting the context
Examples:
>>> with named_temporary_directory() as temporary_directory:
... # do something with the temporary directory.
... # The directory will be deleted at the
... # end of the context, except if delete=False.
"""
with named_temporary_path(delete=delete) as path:
path.mkdir()
yield path
def is_pathname_valid(pathname: PathLike) -> bool:
"""
`True` if the passed pathname is a valid pathname for the current OS;
`False` otherwise.
Copied from https://stackoverflow.com/a/34102855. More details there.
"""
pathname = str(pathname)
try:
if not isinstance(pathname, str) or not pathname:
return False
_, pathname = os.path.splitdrive(pathname)
root_dirname = (
os.environ.get("HOMEDRIVE", "C:")
if sys.platform == "win32"
else os.path.sep
)
assert os.path.isdir(root_dirname)
root_dirname = root_dirname.rstrip(os.path.sep) + os.path.sep
for pathname_part in pathname.split(os.path.sep):
try:
os.lstat(root_dirname + pathname_part)
except OSError as exc:
if hasattr(exc, "winerror"):
error_invalid_name = 123
if exc.winerror == error_invalid_name:
return False
elif exc.errno in {errno.ENAMETOOLONG, errno.ERANGE}:
return False
except TypeError:
return False
else:
return True
def is_path_creatable(pathname: PathLike) -> bool:
"""
`True` if the current user has sufficient permissions to create the passed
pathname; `False` otherwise.
Copied from https://stackoverflow.com/a/34102855. More details there.
"""
pathname = str(pathname)
dirname = os.path.dirname(pathname) or os.getcwd()
return os.access(dirname, os.W_OK)
def is_path_exists_or_creatable(pathname: PathLike) -> bool:
"""
`True` if the passed pathname is a valid pathname for the current OS _and_
either currently exists or is hypothetically creatable; `False` otherwise.
This function is guaranteed to _never_ raise exceptions.
Copied from https://stackoverflow.com/a/34102855. More details there.
"""
pathname = str(pathname)
try:
return is_pathname_valid(pathname) and (
os.path.exists(pathname) or is_path_creatable(pathname)
)
except OSError:
return False
def paths_are_identical(*paths: PathLike) -> bool:
"""Whether paths, possibly given in a mix of absolute and relative formats,
point to the same file."""
real_paths = {os.path.realpath(p) for p in paths}
return len(real_paths) == 1
def raise_if_paths_are_identical(*paths: PathLike) -> None:
"""
Raise an exception if input and output paths point to the same file.
"""
if paths_are_identical(*paths):
paths_str = ", ".join(f'"{p}"' for p in paths)
raise ValueError(f"The paths {paths_str} must be different.")
def ensure_directory_exists_and_is_empty(directory: Path) -> None:
"""Create a directory if it does not exist already, and raise if not empty."""
directory.mkdir(parents=True, exist_ok=True)
directory_contains_files = any(directory.iterdir())
if directory_contains_files:
raise RuntimeError(f'The directory "{directory}" is required to be empty.')
def get_file_size_as_string(file: PathLike) -> str:
"""Get the file size as a readable string.
Adapted from https://stackoverflow.com/a/39988702.
Args:
file: File to get the size for.
Raises:
ValueError: if the given path is not a file.
Returns:
Readable string for the file size (such as "1000.0 bytes",
"2.3 KB", or "1.1 MB").
"""
if not isinstance(file, Path):
file = Path(file)
if file.is_dir():
raise ValueError(f'"{file} should be a file, but it is a directory.')
# Get the size in bytes
size: Union[int, float] = file.stat().st_size
for unit in ["bytes", "KB", "MB", "GB", "TB"]:
if size < 1024.0:
return f"{size:3.1f} {unit}"
size /= 1024.0
raise RuntimeError(f'The file "{file}" is too big to determine the size.')