Skip to content

Commit

Permalink
Merge pull request #67 from benjamingr/from_file_performance
Browse files Browse the repository at this point in the history
Fix #66, manage the I/O speed by buffering large chunks of a file in `from_file()`. Add tests to ensure this works fine
  • Loading branch information
mangiucugna authored Aug 28, 2024
2 parents 04d4327 + ae05f0d commit ff0c617
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 15 deletions.
67 changes: 56 additions & 11 deletions src/json_repair/json_repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,52 @@

class StringFileWrapper:
# This is a trick to simplify the code, transform the filedescriptor handling into a string handling
def __init__(self, fd: TextIO) -> None:
def __init__(self, fd: TextIO, CHUNK_LENGTH: int) -> None:
self.fd = fd
self.length: int = 0
# Buffers are 1MB strings that are read from the file
# and kept in memory to keep reads low
self.buffers: dict[int, str] = {}
# CHUNK_LENGTH is in bytes
if not CHUNK_LENGTH or CHUNK_LENGTH < 2:
CHUNK_LENGTH = 1_000_000
self.buffer_length = CHUNK_LENGTH

def fill_buffer(self, index: int) -> None:
if self.buffers.get(index) is None:
self.fd.seek(index * self.buffer_length)
self.buffers[index] = self.fd.read(self.buffer_length)
# Save memory by keeping max 2MB buffer chunks and min 2 chunks
if len(self.buffers) > max(2, 2_000_000 / self.buffer_length):
oldest_key = next(iter(self.buffers))
self.buffers.pop(oldest_key)

def __getitem__(self, index: Union[int, slice]) -> str:
# The buffer is an array that is seek like a RAM:
# self.buffers[index]: the row in the array of length 1MB, index is `i` modulo CHUNK_LENGTH
# self.buffures[index][j]: the column of the row that is `i` remainder CHUNK_LENGTH
if isinstance(index, slice):
self.fd.seek(index.start)
value = self.fd.read(index.stop - index.start)
self.fd.seek(index.start)
return value
buffer_index = index.start // self.buffer_length
buffer_end = index.stop // self.buffer_length
for i in range(buffer_index, buffer_end + 1):
self.fill_buffer(i)
if buffer_index == buffer_end:
return self.buffers[buffer_index][
index.start % self.buffer_length : index.stop % self.buffer_length
]
else:
start_slice = self.buffers[buffer_index][
index.start % self.buffer_length :
]
end_slice = self.buffers[buffer_end][: index.stop % self.buffer_length]
middle_slices = [
self.buffers[i] for i in range(buffer_index + 1, buffer_end)
]
return start_slice + "".join(middle_slices) + end_slice
else:
self.fd.seek(index)
return self.fd.read(1)
buffer_index = index // self.buffer_length
self.fill_buffer(buffer_index)
return self.buffers[buffer_index][index % self.buffer_length]

def __len__(self) -> int:
if self.length < 1:
Expand Down Expand Up @@ -69,13 +102,14 @@ def __init__(
json_str: Union[str, StringFileWrapper],
json_fd: Optional[TextIO],
logging: Optional[bool],
json_fd_chunk_length: int = 0,
) -> None:
# The string to parse
self.json_str = json_str
# Alternatively, the file description with a json file in it
if json_fd:
# This is a trick we do to treat the file wrapper as an array
self.json_str = StringFileWrapper(json_fd)
self.json_str = StringFileWrapper(json_fd, json_fd_chunk_length)
# Index is our iterator that will keep track of which character we are looking at right now
self.index: int = 0
# This is used in the object member parsing to manage the special cases of missing quotes in key or value
Expand Down Expand Up @@ -639,6 +673,7 @@ def repair_json(
logging: bool = False,
json_fd: Optional[TextIO] = None,
ensure_ascii: bool = True,
chunk_length: int = 0,
) -> Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]]]:
"""
Given a json formatted string, it will try to decode it and, if it fails, it will try to fix it.
Expand All @@ -647,7 +682,7 @@ def repair_json(
When `skip_json_loads=True` is passed, it will not call the built-in json.loads() function
When `logging=True` is passed, it will return a tuple with the repaired json and a log of all repair actions
"""
parser = JSONParser(json_str, json_fd, logging)
parser = JSONParser(json_str, json_fd, logging, chunk_length)
if skip_json_loads:
parsed_json = parser.parse()
else:
Expand Down Expand Up @@ -683,14 +718,18 @@ def loads(


def load(
fd: TextIO, skip_json_loads: bool = False, logging: bool = False
fd: TextIO,
skip_json_loads: bool = False,
logging: bool = False,
chunk_length: int = 0,
) -> Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]]]:
"""
This function works like `json.load()` except that it will fix your JSON in the process.
It is a wrapper around the `repair_json()` function with `json_fd=fd` and `return_objects=True`.
"""
return repair_json(
json_fd=fd,
chunk_length=chunk_length,
return_objects=True,
skip_json_loads=skip_json_loads,
logging=logging,
Expand All @@ -701,12 +740,18 @@ def from_file(
filename: str,
skip_json_loads: bool = False,
logging: bool = False,
chunk_length: int = 0,
) -> Union[JSONReturnType, Tuple[JSONReturnType, List[Dict[str, str]]]]:
"""
This function is a wrapper around `load()` so you can pass the filename as string
"""
fd = open(filename)
jsonobj = load(fd, skip_json_loads, logging)
jsonobj = load(
fd=fd,
skip_json_loads=skip_json_loads,
logging=logging,
chunk_length=chunk_length,
)
fd.close()

return jsonobj
Loading

0 comments on commit ff0c617

Please sign in to comment.