This repository has been archived by the owner on Sep 3, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
101 lines (85 loc) · 2.59 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#!/usr/bin/env python
import click
from click import Context
from click import Path
from dialect_map_gcp.auth import DefaultAuthenticator
from dialect_map_gcp.auth import OpenIDAuthenticator
from dialect_map_gcp.handlers import PubSubQueueHandler
from dialect_map_io.handlers import DialectMapAPIHandler
from job.input import DiffPubSubSource
from job.mapping import SchemaRecordMapper
from job.mapping import CATEGORY_ROUTE
from job.mapping import GROUP_ROUTE
from job.mapping import JARGON_ROUTE
from job.output import DialectMapOperator
from logs import setup_logger
from routines import PubSubRoutine
@click.group()
@click.option(
"--log-level",
envvar="DIALECT_MAP_LOG_LEVEL",
default="INFO",
help="Log messages level",
required=False,
type=str,
)
@click.pass_context
def main(context: Context, log_level: str):
"""Default command group for the jobs"""
setup_logger(log_level)
params = context.ensure_object(dict)
params["LOG_LEVEL"] = log_level
@main.command()
@click.option(
"--gcp-project",
help="GCP Project name",
required=True,
type=str,
)
@click.option(
"--gcp-key-path",
help="GCP Service Account key path",
required=True,
type=Path(
exists=True,
file_okay=True,
dir_okay=False,
),
)
@click.option(
"--subscription",
help="GCP PubSub subscription",
required=True,
type=str,
)
@click.option(
"--api-url",
help="Private API base URL",
required=True,
type=str,
)
def data_diff_job(gcp_project: str, gcp_key_path: str, subscription: str, api_url: str):
"""
Starts a data ingestion job reading messages from Google Pub/sub.
Stops when no more messages are read.
The pipeline expects to receive DiffMessage object parsable messages.
Ref: dialect_map_gcp.models.message.DiffMessage
"""
# Initialize the Pub/Sub source
pubsub_auth = DefaultAuthenticator(gcp_key_path)
pubsub_handler = PubSubQueueHandler(
project_id=gcp_project,
auth_ctl=pubsub_auth,
)
pubsub_source = DiffPubSubSource(pubsub_handler, subscription)
# Initialize API controller
api_auth = OpenIDAuthenticator(gcp_key_path, target_url=api_url)
api_conn = DialectMapAPIHandler(api_auth, base_url=api_url)
api_ctl = DialectMapOperator(api_conn)
# Initialize and start the routine
routine = PubSubRoutine(pubsub_source, api_ctl)
routine.add_mapper("categories.json", SchemaRecordMapper([CATEGORY_ROUTE]))
routine.add_mapper("jargons.json", SchemaRecordMapper([GROUP_ROUTE, JARGON_ROUTE]))
routine.run()
if __name__ == "__main__":
main()