Skip to content

Commit

Permalink
fix(source): rw, automatically set wait
Browse files Browse the repository at this point in the history
  • Loading branch information
engisalor committed Dec 1, 2022
1 parent 26e99ab commit 9bfc159
Showing 1 changed file with 37 additions and 59 deletions.
96 changes: 37 additions & 59 deletions corpusama/source/reliefweb.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math

import pandas as pd

Expand All @@ -12,21 +13,14 @@
class ReliefWeb(Call):
"""Manages API calls made to ReliefWeb.
Usage:
```python
job = ReliefWeb(<input>, "<database.db>")
job.run()
```
Options
- input = JSON/YML filepath or dict with parameters
- database = None or database filename
- n_calls = number of calls to make (pages to collect)
- appname = unique identifier for using ReliefWeb's API
- url = base url for making POST calls
- quota = daily usage limit (see ReliefWeb API documentation)
- wait_dict = dict of wait times for ranges of n_calls
(default {0: 1, 5: 49, 10: 99, 20: 499, 30: None})"""
- input, JSON/YML filepath or dict with parameters
- database, None or database filename
- appname, unique identifier for using ReliefWeb's API
- url, base url for making POST calls
- quota, daily usage limit (see ReliefWeb API documentation)
- wait_dict, dictionary of wait times
(default `{0: 1, 5: 49, 10: 99, 20: 499, 30: None}`)"""

def _offset(self):
"""Adjusts offset parameter and halts job if no more results."""
Expand All @@ -38,72 +32,54 @@ def _offset(self):
raise SystemExit()
logger.debug(self.parameters["offset"])

def all(self, limit: int = 1000):
"""Executes API calls from parameters."""
def all(self, limit: int = 0):
"""Makes repeated calls by incrementing the offset field."""

self.limit = limit
self.call_n = 0
self._set_wait()
self.limit = limit
self.one()

def new(self, limit: int = 1000):
"""Executes calls starting from the latest date.changed value."""
def new(self, limit: int = 0):
"""Makes repeated calls starting from the latest date.changed."""

latest = None
self.parameters["offset"] = 0
# ensure proper sorting parameters
if self.parameters.get("sort", None) != ["date.changed:asc"]:
m = """Input parameters must be set to retrieve results sorted
by date.changed in ascending order. Add `"sort": ["date.changed:asc"]`"""
raise ValueError(m)
# get most recent date.changed value
res = self.db.c.execute("SELECT json_extract(_raw.date,'$.changed') FROM _raw")
self.date_changed_vals = pd.Series(
[pd.Timestamp(x[0]) for x in res.fetchall()], dtype=object
)
self._update_filter()
# make call
logger.debug(f"starting from {latest}")
raise ValueError('Add `"sort": ["date.changed:asc"]` to parameters first.')
self._start_from()
self.all(limit)

def _update_filter(self):
"""Updates parameter filters to start from the latest date.changed value."""

# update if database is not empty
if self.date_changed_vals.any():
latest = self.date_changed_vals.max().isoformat()
# combine new and old conditions
old_conditions = self.original_parameters.get("filter", {}).get(
"conditions", []
)
new_conditions = [{"field": "date.changed", "value": {"from": latest}}]
# update parameters
update = {
"filter": {
"operator": "AND",
"conditions": old_conditions + new_conditions,
}
}
self.parameters = self.original_parameters | update
def _start_from(self):
"""Updates filter parameter to start from the latest date.changed."""

res = self.db.c.execute("SELECT json_extract(_raw.date,'$.changed') FROM _raw")
date_changed = [pd.Timestamp(x[0]) for x in res.fetchall()]
if date_changed:
latest = pd.Series(date_changed, dtype=object).max().isoformat()
old = self.params_old.get("filter", {}).get("conditions", [])
new = [{"field": "date.changed", "value": {"from": latest}}]
update = {"filter": {"operator": "AND", "conditions": old + new}}
self.parameters = self.params_old | update
logger.debug(latest)

@decorator.while_loop
def one(self):
# set variables
if "call_n" not in self.__dict__:
self.call_n = 0
if "limit" not in self.__dict__:
self.limit = 1
if self.call_n >= self.limit:
logger.debug(f"limit reached {self.limit}")
return False
# automatically set limit
if self.call_n == 1 and self.limit == 0:
limit = self.response_json["totalCount"] / self.parameters["limit"]
self.limit = math.ceil(limit)
# prepare call
self._set_wait()
self._enforce_quota()
self._offset()
# make call
self._request()
self._get_field_names()
self._hash()
keys = ["time", "took", "totalCount", "count"]
summary = {k: v for k, v in self.response_json.items() if k in keys}
summary = {k: f"{v:,}" for k, v in self.response_json.items() if k in keys}
logger.debug(f"{summary}")
# store output
if not self.db:
Expand Down Expand Up @@ -198,6 +174,8 @@ def __init__(
self.db = None
self.raw = {}
self.log = {}
self.limit = 1
self._set_wait()
if not appname:
self.appname = self.config["reliefweb"]["appname"]
else:
Expand All @@ -210,4 +188,4 @@ def __init__(
self.db.get_tables()
self.url = "".join([self.url, self.appname])
self._get_parameters()
self.original_parameters = self.parameters.copy()
self.params_old = self.parameters.copy()

0 comments on commit 9bfc159

Please sign in to comment.