Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Funcionalidade de agregação de diários oficiais #79

Merged
merged 16 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,17 @@ wait-opensearch:
publish-tag:
podman tag $(IMAGE_NAMESPACE)/$(IMAGE_NAME):${IMAGE_TAG} $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags)
podman push $(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(shell git describe --tags)

.PHONY: stop-aggregate-gazettes
stop-aggregate-gazettes:
podman stop --ignore agg-gazettes
podman rm --force --ignore agg-gazettes

.PHONY: aggregate-gazettes
aggregate-gazettes: stop-aggregate-gazettes set-run-variable-values
podman run -ti --volume $(CURDIR):/mnt/code:rw \
--pod $(POD_NAME) \
--env PYTHONPATH=/mnt/code \
--env-file envvars \
--name agg-gazettes \
$(IMAGE_NAMESPACE)/$(IMAGE_NAME):$(IMAGE_TAG) python main -p aggregates
2 changes: 2 additions & 0 deletions contrib/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ QUERIDO_DIARIO_FILES_ENDPOINT=http://localhost:9000/queridodiariobucket

# Options: ALL, DAILY, UNPROCESSED
EXECUTION_MODE=DAILY

SEED_HASH=querido-diario
31 changes: 27 additions & 4 deletions main/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from os import environ
import argparse
import logging

from data_extraction import create_apache_tika_text_extraction
from database import create_database_interface
from storage import create_storage_interface
from index import create_index_interface
from tasks import (
create_aggregates,
create_gazettes_index,
create_aggregates_table,
create_themed_excerpts_index,
embedding_rerank_excerpts,
extract_text_from_gazettes,
Expand Down Expand Up @@ -35,9 +38,7 @@ def get_execution_mode():
return environ.get("EXECUTION_MODE", "DAILY")


def execute_pipeline():
enable_debug_if_necessary()

def gazette_texts_pipeline():
execution_mode = get_execution_mode()
database = create_database_interface()
storage = create_storage_interface()
Expand All @@ -61,5 +62,27 @@ def execute_pipeline():
tag_entities_in_excerpts(theme, themed_excerpt_ids, index)


def aggregates_pipeline():
database = create_database_interface()
storage = create_storage_interface()

create_aggregates_table(database)
create_aggregates(database, storage)


def execute_pipeline(pipeline):
enable_debug_if_necessary()

if not pipeline or pipeline == "gazette_texts":
gazette_texts_pipeline()
elif pipeline == "aggregates":
aggregates_pipeline()
else:
raise ValueError("Pipeline inválido.")


if __name__ == "__main__":
execute_pipeline()
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--pipeline", help="Qual pipeline deve ser executado.")
args = parser.parse_args()
execute_pipeline(args.pipeline)
35 changes: 27 additions & 8 deletions storage/digital_ocean_spaces.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import os
from typing import Generator
from typing import Generator, Union
from io import BytesIO
from pathlib import Path

import boto3

Expand Down Expand Up @@ -68,18 +69,36 @@ def __init__(
aws_secret_access_key=self._access_secret,
)

def get_file(self, file_key: str, destination) -> None:
logging.debug(f"Getting {file_key}")
self._client.download_fileobj(self._bucket, file_key, destination)
def get_file(self, file_to_be_downloaded: Union[str, Path], destination) -> None:
logging.debug(f"Getting {file_to_be_downloaded}")
self._client.download_fileobj(self._bucket, str(file_to_be_downloaded), destination)

def upload_content(
self,
file_key: str,
content_to_be_uploaded: str,
content_to_be_uploaded: Union[str, BytesIO],
permission: str = "public-read",
) -> None:
logging.debug(f"Uploading {file_key}")
f = BytesIO(content_to_be_uploaded.encode())
self._client.upload_fileobj(
f, self._bucket, file_key, ExtraArgs={"ACL": permission}

if isinstance(content_to_be_uploaded, str):
f = BytesIO(content_to_be_uploaded.encode())
self._client.upload_fileobj(
f, self._bucket, file_key, ExtraArgs={"ACL": permission}
)
else:
self._client.upload_fileobj(
content_to_be_uploaded, self._bucket, file_key, ExtraArgs={"ACL": permission}
)

def copy_file(self, source_file_key: str, destination_file_key: str) -> None:
logging.debug(f"Copying {source_file_key} to {destination_file_key}")
self._client.copy_object(
Bucket=self._bucket,
CopySource={'Bucket': self._bucket, 'Key': source_file_key},
Key=destination_file_key
)

def delete_file(self, file_key: str) -> None:
logging.debug(f"Deleting {file_key}")
self._client.delete_object(Bucket=self._bucket, Key=file_key)
2 changes: 2 additions & 0 deletions tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .create_index import create_gazettes_index, create_themed_excerpts_index
from .create_aggregates_table import create_aggregates_table
from .gazette_excerpts_embedding_reranking import embedding_rerank_excerpts
from .gazette_excerpts_entities_tagging import tag_entities_in_excerpts
from .gazette_text_extraction import extract_text_from_gazettes
from .gazette_themed_excerpts_extraction import extract_themed_excerpts_from_gazettes
from .gazette_themes_listing import get_themes
from .gazette_txt_to_xml import create_aggregates
from .interfaces import (
DatabaseInterface,
StorageInterface,
Expand Down
18 changes: 18 additions & 0 deletions tasks/create_aggregates_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from .interfaces import DatabaseInterface


def create_aggregates_table(database: DatabaseInterface):
database._commit_changes(
"""
CREATE TABLE IF NOT EXISTS aggregates (
id SERIAL PRIMARY KEY ,
territory_id VARCHAR,
state_code VARCHAR NOT NULL,
year INTEGER,
file_path VARCHAR(255) UNIQUE,
file_size_mb REAL,
hash_info VARCHAR(64),
last_updated TIMESTAMP
); """)


Loading
Loading