forked from hequn8128/pyflink-walkthrough
-
Notifications
You must be signed in to change notification settings - Fork 0
/
payment_msg_proccessing.py
93 lines (78 loc) · 3.7 KB
/
payment_msg_proccessing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
provinces = ("Beijing", "Shanghai", "Hangzhou", "Shenzhen", "Jiangxi", "Chongqing", "Xizang")
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def province_id_to_name(id):
return provinces[id]
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
source_ddl = """
CREATE TABLE payment_msg(
createTime VARCHAR,
orderId BIGINT,
payAmount DOUBLE,
payPlatform INT,
provinceId INT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'payment_msg',
'connector.properties.bootstrap.servers' = 'kafka:9092',
'connector.properties.group.id' = 'test_3',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json'
)
"""
es_sink_ddl = """
CREATE TABLE es_sink (
province VARCHAR PRIMARY KEY,
pay_amount DOUBLE
) with (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://elasticsearch:9200',
'connector.index' = 'platform_pay_amount_1',
'connector.document-type' = 'payment',
'update-mode' = 'upsert',
'connector.flush-on-checkpoint' = 'true',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.max-size' = '42mb',
'connector.bulk-flush.max-actions' = '32',
'connector.bulk-flush.interval' = '1000',
'connector.bulk-flush.backoff.delay' = '1000',
'format.type' = 'json'
)
"""
t_env.sql_update(source_ddl)
t_env.sql_update(es_sink_ddl)
t_env.register_function('province_id_to_name', province_id_to_name)
t_env.from_path("payment_msg") \
.select("province_id_to_name(provinceId) as province, payAmount") \
.group_by("province") \
.select("province, sum(payAmount) as pay_amount") \
.insert_into("es_sink")
t_env.execute("payment_demo")
if __name__ == '__main__':
log_processing()