Skip to content

Commit

Permalink
global: connect to logstash directly
Browse files Browse the repository at this point in the history
  • Loading branch information
drjova committed Oct 18, 2023
1 parent 1d548f9 commit 452c898
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
80 changes: 61 additions & 19 deletions hepcrawl/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,23 @@
import requests
import shutil
import structlog
import logstash

from .api import CrawlResult
from .settings import FILES_STORE
from .utils import RecordFile

LOGGER = logging.getLogger(name=__name__)
STRUCT_LOGGER = structlog.get_logger()

LOGSTASH_HOST = os.environ.get("LOGSTASH_HOST", "localhost")
LOGSTASH_PORT = os.environ.get("LOGSTASH_PORT", 6060)
LOGSTASH_VERSION = os.environ.get("LOGSTASH_VERSION", 1)
LOGGER = logging.getLogger("python-logstash-logger")
LOGGER.setLevel(logging.INFO)
LOGGER.addHandler(
logstash.TCPLogstashHandler(LOGSTASH_HOST, LOGSTASH_PORT, version=1)
)

class DocumentsPipeline(FilesPipeline):
"""Download all the documents the record passed to download.
Expand Down Expand Up @@ -100,14 +109,16 @@ class InspireAPIPushPipeline(object):

def __init__(self):
self.count = 0
self.dois = []
self.arxiv_eprints = []
self.report_numbers = []
self.spider_name = None
self.scrape_job = None

def open_spider(self, spider):
self.results_data = []
self.logger = STRUCT_LOGGER.bind(
name="Harvesting pipeline",
spider=spider.name,
scrape_job=os.environ.get('SCRAPY_JOB'),
)
self.spider_name = spider.name
self.scrape_job = os.environ.get('SCRAPY_JOB')

def process_item(self, item, spider):
"""Add the crawl result to the results data after processing it.
Expand All @@ -134,18 +145,29 @@ def process_item(self, item, spider):
)
crawl_result = CrawlResult.from_parsed_item(item).to_dict()
self.results_data.append(crawl_result)
ids = []

titles = get_value(crawl_result, 'titles.title', default=[])
dois = get_value(crawl_result, 'dois.value', default=[])
arxiv_eprints = get_value(crawl_result, 'arxiv_eprints.value', default=[])
report_numbers = get_value(crawl_result, 'report_numbers.value', default=[])

self.logger.info(
for doi in dois:
self.dois.append(doi)

for arxiv_eprint in arxiv_eprints:
self.arxiv_eprints.append(arxiv_eprint)

for report_numbers in report_numbers:
self.report_numbers.append(report_numbers)

LOGGER.info(
'Processing item.',
titles=titles,
dois=dois,
arxiv_eprints=arxiv_eprints,
report_numbers=report_numbers,
spider=self.spider_name,
scrapy_job=self.scrape_job,
)
return crawl_result

Expand Down Expand Up @@ -175,7 +197,12 @@ def _prepare_payload(self, spider):
errors=errors,
)
)
self.logger.error("Error.", errors=errors)
LOGGER.info(
'Errors.',
spider=self.spider_name,
scrapy_job=self.scrape_job,
errors=errors,
)
return payload_list

@staticmethod
Expand Down Expand Up @@ -231,7 +258,6 @@ def open_spider(self, spider):

))
super(InspireCeleryPushPipeline, self).open_spider(spider=spider)
self.logger.info('Start processing.')


def close_spider(self, spider):
Expand All @@ -256,11 +282,14 @@ def close_spider(self, spider):
)
logger.info('Triggering celery task: %s.', task_endpoint)

LOGGER.info(
'Sending tasks.',
spider=self.spider_name,
scrapy_job=self.scrape_job,
number_of_results=self.count,
)

for kwargs in self._prepare_payload(spider):
self.logger.info(
'Finish Processing.',
number_of_results=self.count,
)
logger.debug(
' Sending results:\n %s',
pprint.pformat(kwargs),
Expand All @@ -271,13 +300,26 @@ def close_spider(self, spider):
'scrapy_job_id': os.environ.get('SCRAPY_JOB')
}
logger.info('Sent celery task %s', pprint.pformat(celery_task_info_payload))
self.logger.info(
'Celery task sent.',
celery_task_id=res.id,
LOGGER.info(
'Sending task.',
spider=self.spider_name,
scrapy_job=self.scrape_job,
kwargs=kwargs,
celery_task_id=res.id
)
LOGGER.info(
'Finish Processing.',
dois=self.dois,
arxiv_eprints=self.arxiv_eprints,
report_numbers=self.report_numbers,
spider=self.spider_name,
scrapy_job=self.scrape_job,
number_of_results=self.count,
)
else:
self.logger.info(
LOGGER.info(
'No results.',
number_of_results=self.count,
spider=self.spider_name,
scrapy_job=self.scrape_job,
)
self._cleanup(spider)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
'queuelib==1.5.0',
'sentry-sdk==1.3.0',
'structlog==20.1.0',
'python-logstash==0.4.8',
]

tests_require = [
Expand Down

0 comments on commit 452c898

Please sign in to comment.