-
Notifications
You must be signed in to change notification settings - Fork 594
/
shared_source.slt
215 lines (173 loc) · 5.1 KB
/
shared_source.slt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
control substitution on
statement ok
SET rw_enable_shared_source TO true;
system ok
rpk topic create shared_source -p 4
system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
3 {"v1": 4, "v2": "d"}
EOF
statement ok
create source s0 (v1 int, v2 varchar) with (
${RISEDEV_KAFKA_WITH_OPTIONS_COMMON},
topic = 'shared_source',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
query I
select count(*) from rw_internal_tables where name like '%s0%';
----
1
sleep 1s
# SourceExecutor's ingestion does not start (state table is empty), even after sleep
system ok
internal_table.mjs --name s0 --type source
----
(empty)
statement ok
create materialized view mv_1 as select * from s0;
# Wait enough time to ensure SourceExecutor consumes all Kafka data.
sleep 2s
# SourceExecutor's ingestion started, but it only starts from latest.
system ok
internal_table.mjs --name s0 --type source
----
(empty)
# offset 0 must be backfilled, not from upstream.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"{""Backfilling"": ""0""}"
1,"{""Backfilling"": ""0""}"
2,"{""Backfilling"": ""0""}"
3,"{""Backfilling"": ""0""}"
# This does not affect the behavior for CREATE MATERIALIZED VIEW below. It also uses the shared source, and creates SourceBackfillExecutor.
statement ok
SET rw_enable_shared_source TO false;
statement ok
create materialized view mv_2 as select * from s0;
sleep 2s
query IT rowsort
select v1, v2 from s0;
----
1 a
2 b
3 c
4 d
query IT rowsort
select v1, v2 from mv_1;
----
1 a
2 b
3 c
4 d
query IT rowsort
select v1, v2 from mv_2;
----
1 a
2 b
3 c
4 d
system ok
cat << EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0
0 {"v1": 1, "v2": "aa"}
1 {"v1": 2, "v2": "bb"}
2 {"v1": 3, "v2": "cc"}
3 {"v1": 4, "v2": "dd"}
EOF
sleep 2s
# SourceExecutor's finally got new data now.
system ok
internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
query IT rowsort
select v1, v2 from s0;
----
1 a
1 aa
2 b
2 bb
3 c
3 cc
4 d
4 dd
query IT rowsort
select v1, v2 from mv_1;
----
1 a
1 aa
2 b
2 bb
3 c
3 cc
4 d
4 dd
# start_offset changed to 1
system ok
internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 1, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
# The result is non-deterministic:
# If the upstream row comes before the backfill row, it will be ignored, and the result state is "{""Backfilling"": ""1""}".
# If the upstream row comes after the backfill row, the result state is Finished.
# Uncomment below and run manually to see the result.
# system ok
# internal_table.mjs --name mv_1 --type sourcebackfill
# ----
# 0,"{""Finished""}"
# 1,"{""Finished""}"
# 2,"{""Finished""}"
# 3,"{""Finished""}"
# Note: heredoc in loop in mac's sh is ok, but not in linux's sh. So we use bash here.
system ok
bash -c 'for i in {0..9}; do
sleep 0.1
cat <<EOF | rpk topic produce shared_source -f "%p %v\\n" -p 0
0 {"v1": 1, "v2": "a"}
1 {"v1": 2, "v2": "b"}
2 {"v1": 3, "v2": "c"}
3 {"v1": 4, "v2": "d"}
EOF
done'
sleep 3s
query IT rowsort
select v1, count(*) from s0 group by v1;
----
1 12
2 12
3 12
4 12
query IT rowsort
select v1, count(*) from mv_1 group by v1;
----
1 12
2 12
3 12
4 12
# start_offset changed to 11
system ok
internal_table.mjs --name s0 --type source
----
0,"{""split_info"": {""partition"": 0, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
1,"{""split_info"": {""partition"": 1, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
2,"{""split_info"": {""partition"": 2, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
3,"{""split_info"": {""partition"": 3, ""start_offset"": 11, ""stop_offset"": null, ""topic"": ""shared_source""}, ""split_type"": ""kafka""}"
# Now it is highly probable that all partitions have finished.
system ok
internal_table.mjs --name mv_1 --type sourcebackfill
----
0,"""Finished"""
1,"""Finished"""
2,"""Finished"""
3,"""Finished"""
statement ok
drop source s0 cascade;