-
Notifications
You must be signed in to change notification settings - Fork 0
/
weather_processing.py
76 lines (64 loc) · 2.14 KB
/
weather_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
from airflow import DAG
from datetime import datetime
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.postgres_hook import PostgresHook
from pandas import json_normalize
def _process_weather(ti):
weather = ti.xcom_pull(task_ids='extract_weather')
processed_weather = json_normalize(
{
'temperature': weather['temperature'],
'wind': weather['wind'],
'description': weather['description']
}
)
processed_weather.to_csv('/tmp/processed_user.csv', index=None, header=False)
def _store_weather():
hook = PostgresHook(postgres_conn_id='postgres')
hook.copy_expert(
sql="COPY weather_maranguape FROM stdin WITH DELIMITER as ','",
filename='/tmp/processed_user.csv',
)
with DAG(
"weather_processing",
start_date=datetime(2022,9,16),
schedule_interval="*/1 * * * *",
catchup=False) as dag:
create_table = PostgresOperator(
task_id = 'create_table',
postgres_conn_id='postgres',
sql=
'''
CREATE TABLE IF NOT EXISTS weather_maranguape (
temperature TEXT NOT NULL,
wind TEXT NOT NULL,
description TEXT NOT NULL
);
'''
)
is_api_avaliable = HttpSensor(
task_id = 'is_api_avaliable',
http_conn_id='api_weather',
endpoint=""
)
extract_weather = SimpleHttpOperator(
task_id='extract_weather',
http_conn_id='api_weather',
endpoint='',
method='GET',
response_filter= lambda response: json.loads(response.text),
log_response=True
)
process_weather = PythonOperator(
task_id='process_weather',
python_callable=_process_weather
)
store_weather = PythonOperator(
task_id='store_weather',
python_callable=_store_weather
)
create_table>>is_api_avaliable>>extract_weather>>process_weather>>store_weather