Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support session and hop window #36

Merged
merged 20 commits into from
Sep 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,21 @@ sh clientExector.sh startTask
```shell
java -cp RocketmqTest-1.0-SNAPSHOT.jar com.test.rocketmqtest.producer.Producer
```
执行上述语句,向RocketMQ的rsqldb-source topic中写入RocketmqTest-1.0-SNAPSHOT.jar包中默认数据,即向rocketmq发送的数据为
执行上述语句,向RocketMQ的rsqldb-source topic中写入RocketmqTest-1.0-SNAPSHOT.jar包中默认数据,即向RocketMQ发送的数据为
```xml
1,2,3,4
2,2,3,4
3,2,3,4
4,2,3,4
```
也可以向rocketmq发送json数据,显式指定数据数据哪个字段,见rsqldb-disk/client中其他例子)
也可以向RocketMQ发送json数据,显式指定数据数据哪个字段,见rsqldb-disk/client中其他例子)
根据执行的任务rocketmq.sql,期望能将field_1=1的数据过滤出来;

另外,也可以使用RocketmqTest-1.0-SNAPSHOT.jar向任意topic发送任意数据,使用方式是:
``shell
java -cp RocketmqTest-1.0-SNAPSHOT.jar com.test.rocketmqtest.producer.Producer ${topic} ${groupId} ${数据文件权路径}
``

### 查看结果输出
```shell
java -cp RocketmqTest-1.0-SNAPSHOT.jar com.test.rocketmqtest.consumer.Consumer
Expand All @@ -72,7 +73,11 @@ java -cp RocketmqTest-1.0-SNAPSHOT.jar com.test.rocketmqtest.consumer.Consumer
```xml
Receive New Messages: body[{"field_3":"3","field_4":"4","field_1":"1","field_2":"2"}]
```
Consumer类允许带两个参数:topic、groupId,可以指定topic消费RocketMQ数据。

另外,也可以使用RocketmqTest-1.0-SNAPSHOT.jar接收任意topic的数据,使用方式是:
``shell
java -cp RocketmqTest-1.0-SNAPSHOT.jar com.test.rocketmqtest.consumer.Consumer ${topic} ${groupId}
``


## 其他启动方式
Expand Down
35 changes: 35 additions & 0 deletions rsqldb-disk/client/window/hop_eventTime.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
CREATE TABLE user_clicks
(
username VARCHAR,
click_url VARCHAR,
ts TIMESTAMP,
WATERMARK wk FOR ts AS WITHOFFSET (ts, 2000)--为rowtime定义Watermark。
) WITH (
type = 'rocketmq',
topic = 'user_clicks',
groupName = 'user_clicks',
namesrvAddr = '127.0.0.1:9876',
isJsonData = 'true',
msgIsJsonArray = 'false'
);


CREATE TABLE hop_output
(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) WITH (
type = 'print'
);


INSERT INTO hop_output
SELECT
HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start,
HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end,
username as username,
COUNT(click_url) as clicks
FROM user_clicks
GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username;
33 changes: 33 additions & 0 deletions rsqldb-disk/client/window/hop_processTime.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
CREATE TABLE window_test
(
username VARCHAR,
click_url VARCHAR,
ts as PROCTIME()
) WITH (
type = 'rocketmq',
topic = 'window_test',
groupName = 'window_test',
namesrvAddr = '127.0.0.1:9876',
isJsonData = 'true',
msgIsJsonArray = 'false'
);

CREATE TABLE hop_output
(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='print'
);

INSERT INTO hop_output
SELECT
HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start,
HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end,
username as username,
COUNT(click_url) as clicks
FROM window_test
GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username;

34 changes: 34 additions & 0 deletions rsqldb-disk/client/window/session_eventTime.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
CREATE TABLE user_clicks
(
username VARCHAR,
click_url VARCHAR,
ts TIMESTAMP,
WATERMARK wk FOR ts as withOffset(ts, 2000) --为rowtime定义Watermark。
) WITH (
type = 'rocketmq',
topic = 'user_clicks',
groupName = 'user_clicks',
namesrvAddr = '127.0.0.1:9876',
isJsonData = 'true',
msgIsJsonArray = 'false'
);


CREATE TABLE task_sink
(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) WITH (
type = 'print'
);

INSERT INTO task_sink
SELECT
SESSION_START(ts, INTERVAL '30' SECOND) as window_start,
SESSION_END(ts, INTERVAL '30' SECOND) as window_end,
username as username,
COUNT(click_url) as clicks
FROM user_clicks
GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
32 changes: 32 additions & 0 deletions rsqldb-disk/client/window/session_processTime.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE TABLE window_test
(
username VARCHAR,
click_url VARCHAR,
ts as PROCTIME()
) WITH (
type = 'rocketmq',
topic = 'window_test',
groupName = 'window_test',
namesrvAddr = '127.0.0.1:9876',
isJsonData = 'true',
msgIsJsonArray = 'false'
);

CREATE TABLE session_output
(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
type='print'
);

INSERT INTO session_output
SELECT
SESSION_START(ts, INTERVAL '30' SECOND) as window_start,
SESSION_END(ts, INTERVAL '30' SECOND) as window_end,
username as username,
COUNT(click_url) as clicks
FROM window_test
GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;