Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDESK-6441] Ability to register new routing rule handlers #2354

Merged
merged 5 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from .routing_rules import RoutingRuleSchemeResource, RoutingRuleSchemeService
from .rule_sets import RuleSetsService, RuleSetsResource
from .rule_handlers import IngestRuleHandlersResource, IngestRuleHandlersService
from superdesk import get_backend
import superdesk

Expand All @@ -30,6 +31,10 @@ def init_app(app) -> None:
service = RoutingRuleSchemeService(endpoint_name, backend=get_backend())
RoutingRuleSchemeResource(endpoint_name, app=app, service=service)

endpoint_name = "ingest_rule_handlers"
service = IngestRuleHandlersService(endpoint_name, backend=get_backend())
IngestRuleHandlersResource(endpoint_name, app=app, service=service)

superdesk.privilege(
name="rule_sets",
label=lazy_gettext("Transformation Rules Management"),
Expand Down
154 changes: 21 additions & 133 deletions apps/rules/routing_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

from enum import Enum
from datetime import datetime, timedelta
from superdesk import get_resource_service
from superdesk.resource import Resource
from superdesk.services import BaseService
from superdesk.errors import SuperdeskApiError
from eve.utils import config
from superdesk.metadata.item import CONTENT_STATE
from superdesk.utc import set_time
from flask_babel import _

from .rule_handlers import get_routing_rule_handler

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -78,6 +78,7 @@ class RoutingRuleSchemeResource(Resource):
"type": "dict",
"schema": {
"name": {"type": "string"},
"handler": {"type": "string", "nullable": False, "default": "desk_fetch_publish"},
"filter": Resource.rel("content_filters", nullable=True),
"actions": {
"type": "dict",
Expand Down Expand Up @@ -108,6 +109,12 @@ class RoutingRuleSchemeResource(Resource):
},
"exit": {"type": "boolean"},
"preserve_desk": {"type": "boolean"},
"extra": {
"type": "dict",
"nullable": True,
"schema": {},
"allow_unknown": True,
},
},
},
"schedule": {
Expand Down Expand Up @@ -203,28 +210,20 @@ def apply_routing_scheme(self, ingest_item, provider, routing_scheme):
"Applying rule. Item: %s . Routing Scheme: %s. Rule Name %s."
% (ingest_item.get("guid"), routing_scheme.get("name"), rule.get("name"))
)
if filters_service.does_match(content_filter, ingest_item):

rule_handler = get_routing_rule_handler(rule)
if not rule_handler.can_handle(rule, ingest_item, routing_scheme):
logger.info(
"Routing rule %s of Routing Scheme %s for Provider %s does not support item %s"
% (rule.get("name"), routing_scheme.get("name"), provider.get("name"), ingest_item[config.ID_FIELD])
)
elif filters_service.does_match(content_filter, ingest_item):
logger.info(
"Filter matched. Item: %s. Routing Scheme: %s. Rule Name %s."
% (ingest_item.get("guid"), routing_scheme.get("name"), rule.get("name"))
)
if rule.get("actions", {}).get("preserve_desk", False) and ingest_item.get("task", {}).get("desk"):
desk = get_resource_service("desks").find_one(req=None, _id=ingest_item["task"]["desk"])
if ingest_item.get("task", {}).get("stage"):
stage_id = ingest_item["task"]["stage"]
else:
stage_id = desk["incoming_stage"]
self.__fetch(ingest_item, [{"desk": desk[config.ID_FIELD], "stage": stage_id}], rule)
fetch_actions = [
f
for f in rule.get("actions", {}).get("fetch", [])
if f.get("desk") != ingest_item["task"]["desk"]
]
else:
fetch_actions = rule.get("actions", {}).get("fetch", [])

self.__fetch(ingest_item, fetch_actions, rule)
self.__publish(ingest_item, rule.get("actions", {}).get("publish", []), rule)
rule_handler.apply_rule(rule, ingest_item, routing_scheme)
if rule.get("actions", {}).get("exit", False):
logger.info(
"Exiting routing scheme. Item: %s . Routing Scheme: %s. "
Expand Down Expand Up @@ -281,7 +280,9 @@ def _validate_routing_scheme(self, routing_scheme):
raise SuperdeskApiError.badRequestError(message=_("A Routing Scheme must have at least one Rule"))
for routing_rule in routing_rules:
invalid_fields = [
field for field in routing_rule.keys() if field not in ("name", "filter", "actions", "schedule")
field
for field in routing_rule.keys()
if field not in ("name", "handler", "filter", "actions", "schedule")
]

if invalid_fields:
Expand Down Expand Up @@ -410,116 +411,3 @@ def _get_scheduled_routing_rules(self, rules, current_dt_utc):
scheduled_rules.append(rule)

return scheduled_rules

def __fetch(self, ingest_item, destinations, rule):
"""Fetch to item to the destinations

:param item: item to be fetched
:param destinations: list of desk and stage
"""
archive_items = []
for destination in destinations:
try:
logger.info("Fetching item %s to desk %s" % (ingest_item.get("guid"), destination))
target = self.__getTarget(destination)
item_id = get_resource_service("fetch").fetch(
[
{
config.ID_FIELD: ingest_item[config.ID_FIELD],
"desk": str(destination.get("desk")),
"stage": str(destination.get("stage")),
"state": CONTENT_STATE.ROUTED,
"macro": destination.get("macro", None),
"target": target,
},
],
macro_kwargs={
"rule": rule,
},
)[0]
archive_items.append(item_id)
logger.info("Fetched item %s to desk %s" % (ingest_item.get("guid"), destination))
except Exception:
logger.exception("Failed to fetch item %s to desk %s" % (ingest_item.get("guid"), destination))

return archive_items

def __getTarget(self, destination):
"""Get the target for destination

:param dict destination: routing destination
:return dict: returns target information
"""
target = {}
if destination.get("target_subscribers"):
target["target_subscribers"] = destination.get("target_subscribers")

if destination.get("target_types"):
target["target_types"] = destination.get("target_types")

return target

def __publish(self, ingest_item, destinations, rule):
"""Fetches the item to the desk and then publishes the item.

:param item: item to be published
:param destinations: list of desk and stage
"""
guid = ingest_item.get("guid")
items_to_publish = self.__fetch(ingest_item, destinations, rule)
for item in items_to_publish:
try:
archive_item = get_resource_service("archive").find_one(req=None, _id=item)
if archive_item.get("auto_publish") is False:
logger.info("Stop auto publishing of item %s", guid)
continue
logger.info("Publishing item %s", guid)
self._set_default_values(archive_item)
get_resource_service("archive_publish").patch(item, {"auto_publish": True})
logger.info("Published item %s", guid)
except Exception:
logger.exception("Failed to publish item %s.", guid)

def _set_default_values(self, archive_item):
"""Assigns the default values to the item that about to be auto published"""
default_categories = self._get_categories(config.DEFAULT_CATEGORY_QCODES_FOR_AUTO_PUBLISHED_ARTICLES)
default_values = self._assign_default_values(archive_item, default_categories)
get_resource_service("archive").patch(archive_item["_id"], default_values)

def _assign_default_values(self, archive_item, default_categories):
"""Assigns the default values to the item that about to be auto published"""

default_values = {}
default_values["headline"] = archive_item.get("headline") or " "

if archive_item.get("anpa_category"):
default_values["anpa_category"] = archive_item.get("anpa_category")
else:
default_values["anpa_category"] = default_categories

default_values["slugline"] = archive_item.get("slugline") or " "
default_values["body_html"] = archive_item.get("body_html") or "<p></p>"
return default_values

def _get_categories(self, qcodes):
"""Returns list of categories for a given comma separated qcodes"""

if not qcodes:
return

qcode_list = qcodes.split(",")
selected_categories = None
categories = superdesk.get_resource_service("vocabularies").find_one(req=None, _id="categories")

if categories and len(qcode_list) > 0:
selected_categories = []
for qcode in qcode_list:
selected_categories.extend(
[
{"qcode": qcode, "name": c.get("name", "")}
for c in categories["items"]
if c["is_active"] is True and qcode.lower() == c["qcode"].lower()
]
)

return selected_categories
Loading