From e84e8bffa2b56fd0571fe227a4e61fca0ac86c21 Mon Sep 17 00:00:00 2001 From: engisalor <50170623+engisalor@users.noreply.github.com> Date: Wed, 29 May 2024 15:03:28 +0200 Subject: [PATCH] fix: improve stanza pipeline --- pipeline/stanza/base_pipeline.py | 162 ++++++++++++++++++++++++------- 1 file changed, 126 insertions(+), 36 deletions(-) diff --git a/pipeline/stanza/base_pipeline.py b/pipeline/stanza/base_pipeline.py index 2d83362..3ce690c 100644 --- a/pipeline/stanza/base_pipeline.py +++ b/pipeline/stanza/base_pipeline.py @@ -11,18 +11,31 @@ https://universaldependencies.org/format.html """ +import datetime +import logging + +# import os +# import tracemalloc +import math import re from pathlib import Path from time import perf_counter import stanza from defusedxml.ElementTree import fromstring +from stanza import DownloadMethod from stanza.utils.conll import CoNLL -processors = "tokenize,mwt,pos,lemma,depparse" -language = "es" -chunksize = 1 # MB -nlp = stanza.Pipeline(language, processors=processors, download_method=None) + +def convert_size(size_bytes) -> tuple: + """See https://stackoverflow.com/questions/5194057/""" + if size_bytes == 0: + return (0, "B") + size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB") + i = int(math.floor(math.log(size_bytes, 1024))) + p = math.pow(1024, i) + s = round(size_bytes / p, 2) + return (s, size_name[i]) def vert_to_conll(file: str) -> None: @@ -106,13 +119,46 @@ def conll_to_vert(file: str) -> None: class NLP: """Runs Stanza NLP on files generated from Corpusama.""" + @staticmethod + def calculate_rate(run, _bytes, t0): + t1 = perf_counter() + secs = t1 - t0 + _time = str(datetime.timedelta(seconds=secs, microseconds=0)) + # _size, _peak = tracemalloc.get_traced_memory() + # tracemalloc.reset_peak() + # tot_m, used_m, free_m = map( + # int, os.popen('free -t -m').readlines()[-1].split()[1:]) + # _peak, _unit = convert_size(_peak) + size, unit = convert_size(_bytes) + msg = " - ".join( + [ + f"run {run}", + f"{_time}", + f"{size:,.2f} {unit}", + f"{size/secs:,.3f} {unit}/s", + # f"{_peak:,.2f} {_unit} RAM", + # f"{free_m:,.0f} MB free RAM", + ] + ) + logging.info(msg) + def to_conll_inner(self) -> None: + # tracemalloc.start() + self.run_current += 1 + # if self.run_current % 4 == 0: + # logging.info(f"run {self.run_current} - reload stanza pipeline") + # self.nlp = stanza.Pipeline( + # self.language, + # processors=self.processors, + # download_method=self.download_method, + # logging_level='WARN') + t0 = perf_counter() + bytes_batch = sum([len(x[1].encode()) for x in self.docs]) self.xml_headers = [x[0] for x in self.docs] - self.docs = nlp.bulk_process([x[1] for x in self.docs]) + self.docs = self.nlp.bulk_process([x[1] for x in self.docs]) for x in range(len(self.docs)): self.docs[x].xml_header = self.xml_headers[x] self.docs[x].xml_footer = "\n" - self.dest.unlink(missing_ok=True) for doc in self.docs: xml = fromstring(doc.xml_header + doc.xml_footer) with open(self.dest, "a") as f: @@ -123,45 +169,89 @@ def to_conll_inner(self) -> None: ) CoNLL.write_doc2conll(doc, self.dest, "a") self.docs = [] - self.batch_current += 1 - if self.max_batches and self.batch_current <= self.max_batches: - return True - else: - return False + self.bytes_processed += bytes_batch + self.calculate_rate(self.run_current, bytes_batch, t0) def to_conll(self) -> None: - while True: - with open(self.source) as f: - for i, line in enumerate(f): - if line.startswith('"): - self.docs.append((self.meta, self.doc)) - if ( - sum([len(doc[1].encode()) for doc in self.docs]) - >= self.chunksize - ): - return self.to_conll_inner() - else: - self.doc += line - if self.docs: - return self.to_conll_inner() + t0 = perf_counter() + self.dest.unlink(missing_ok=True) + with open(self.source) as f: + for i, line in enumerate(f): + if line.startswith('"): + self.docs.append((self.meta, self.doc)) + _bytes = sum([len(doc[1].encode()) for doc in self.docs]) + if _bytes >= self.chunksize: + self.to_conll_inner() + else: + self.doc += line + if self.docs: + self.to_conll_inner() + self.calculate_rate("TOTAL", self.bytes_processed, t0) + + def verify(self) -> None: + def inner(file: Path) -> int: + if not file.exists() or file.suffix not in [".txt", ".conllu"]: + raise ValueError(f"{file.suffix} not implemented or no documents found") + docs = 0 + with open(file) as f: + for line in f: + if file.suffix in [".txt"] and line.startswith(" None: self.file = file - self.chunksize = chunksize - self.max_batches = max_batches self.source = Path(self.file) self.dest = self.source.with_suffix(".conllu") self.docs = [] self.doc = "" self.meta = "" - self.chunksize = (1024**2) * self.chunksize - self.batch_current = 1 - self.t0 = perf_counter() + self.chunksize = (1024**2) * chunksize + self.run_current = 0 + self.bytes_processed = 0 + self.language = language + self.processors = processors + self.download_method = download_method + self.nlp = stanza.Pipeline( + language, + processors=processors, + download_method=download_method, + logging_level="WARN", + ) + # tot_m, used_m, free_m = map( + # int, os.popen('free -t -m').readlines()[-1].split()[1:]) + msg = " ".join( + [ + f"{self.file} ({language}) -> Stanza (~{chunksize} MB chunks)", + # f" -> CoNLL ({free_m:,.0f} MB free RAM)", + ] + ) + + logging.info(msg) self.to_conll() - self.t1 = perf_counter() - print(f"{self.t1-self.t0:.2f} secs") + self.verify()