From 66296fe21957276ad87629d0ed7984793b64812c Mon Sep 17 00:00:00 2001 From: Harris Tzovanakis Date: Wed, 18 Oct 2023 16:25:54 +0200 Subject: [PATCH] global: connect to logstash directly --- hepcrawl/pipelines.py | 98 +++++++++++++++++++++++++++++++++---------- setup.py | 1 + 2 files changed, 77 insertions(+), 22 deletions(-) diff --git a/hepcrawl/pipelines.py b/hepcrawl/pipelines.py index 74a7b252..e14979a9 100644 --- a/hepcrawl/pipelines.py +++ b/hepcrawl/pipelines.py @@ -27,14 +27,23 @@ import requests import shutil import structlog +import logstash from .api import CrawlResult from .settings import FILES_STORE from .utils import RecordFile -LOGGER = logging.getLogger(name=__name__) STRUCT_LOGGER = structlog.get_logger() +LOGSTASH_HOST = os.environ.get("LOGSTASH_HOST", "localhost") +LOGSTASH_PORT = os.environ.get("LOGSTASH_PORT", 6060) +LOGSTASH_VERSION = os.environ.get("LOGSTASH_VERSION", 1) +LOGGER = logging.getLogger("python-logstash-logger") +LOGGER.setLevel(logging.INFO) +LOGGER.addHandler( + logstash.TCPLogstashHandler(LOGSTASH_HOST, LOGSTASH_PORT, version=1) +) + class DocumentsPipeline(FilesPipeline): """Download all the documents the record passed to download. @@ -100,14 +109,16 @@ class InspireAPIPushPipeline(object): def __init__(self): self.count = 0 + self.dois = [] + self.arxiv_eprints = [] + self.report_numbers = [] + self.spider_name = None + self.scrape_job = None def open_spider(self, spider): self.results_data = [] - self.logger = STRUCT_LOGGER.bind( - name="Harvesting pipeline", - spider=spider.name, - scrape_job=os.environ.get('SCRAPY_JOB'), - ) + self.spider_name = spider.name + self.scrape_job = os.environ.get('SCRAPY_JOB') def process_item(self, item, spider): """Add the crawl result to the results data after processing it. @@ -134,18 +145,31 @@ def process_item(self, item, spider): ) crawl_result = CrawlResult.from_parsed_item(item).to_dict() self.results_data.append(crawl_result) - ids = [] + titles = get_value(crawl_result, 'titles.title', default=[]) dois = get_value(crawl_result, 'dois.value', default=[]) arxiv_eprints = get_value(crawl_result, 'arxiv_eprints.value', default=[]) report_numbers = get_value(crawl_result, 'report_numbers.value', default=[]) - self.logger.info( + for doi in dois: + self.dois.append(doi) + + for arxiv_eprint in arxiv_eprints: + self.arxiv_eprints.append(arxiv_eprint) + + for report_numbers in report_numbers: + self.report_numbers.append(report_numbers) + + LOGGER.info( 'Processing item.', - titles=titles, - dois=dois, - arxiv_eprints=arxiv_eprints, - report_numbers=report_numbers, + extra=dict( + titles=titles, + dois=dois, + arxiv_eprints=arxiv_eprints, + report_numbers=report_numbers, + spider=self.spider_name, + scrapy_job=self.scrape_job, + ) ) return crawl_result @@ -175,7 +199,14 @@ def _prepare_payload(self, spider): errors=errors, ) ) - self.logger.error("Error.", errors=errors) + LOGGER.info( + 'Errors.', + extra=dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + errors=errors, + ) + ) return payload_list @staticmethod @@ -231,7 +262,6 @@ def open_spider(self, spider): )) super(InspireCeleryPushPipeline, self).open_spider(spider=spider) - self.logger.info('Start processing.') def close_spider(self, spider): @@ -256,11 +286,16 @@ def close_spider(self, spider): ) logger.info('Triggering celery task: %s.', task_endpoint) - for kwargs in self._prepare_payload(spider): - self.logger.info( - 'Finish Processing.', + LOGGER.info( + 'Sending tasks.', + extra=dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, number_of_results=self.count, ) + ) + + for kwargs in self._prepare_payload(spider): logger.debug( ' Sending results:\n %s', pprint.pformat(kwargs), @@ -271,13 +306,32 @@ def close_spider(self, spider): 'scrapy_job_id': os.environ.get('SCRAPY_JOB') } logger.info('Sent celery task %s', pprint.pformat(celery_task_info_payload)) - self.logger.info( - 'Celery task sent.', - celery_task_id=res.id, + LOGGER.info( + 'Sending task.', + extra=dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + kwargs=kwargs, + celery_task_id=res.id + ) ) + LOGGER.info( + 'Finish Processing.', + extra=dict( + dois=self.dois, + arxiv_eprints=self.arxiv_eprints, + report_numbers=self.report_numbers, + spider=self.spider_name, + scrapy_job=self.scrape_job, + number_of_results=self.count, + ) + ) else: - self.logger.info( + LOGGER.info( 'No results.', - number_of_results=self.count, + extra=dict( + spider=self.spider_name, + scrapy_job=self.scrape_job, + ) ) self._cleanup(spider) diff --git a/setup.py b/setup.py index 3efd77d1..268121ba 100644 --- a/setup.py +++ b/setup.py @@ -50,6 +50,7 @@ 'queuelib==1.5.0', 'sentry-sdk==1.3.0', 'structlog==20.1.0', + 'python-logstash==0.4.8', ] tests_require = [