From 5d594174164b479d4ebb0456f5ccd0342a8eb72b Mon Sep 17 00:00:00 2001 From: Ni Ze Date: Wed, 7 Sep 2022 19:06:33 +0800 Subject: [PATCH] support session and hop window (#36) * feat(nested join)support nested join * maintain(rsqldb-server) optimize the server * add purchaser_dim.sql * support dim join * remove dim with rocketmq * remove password and userName of db * move file * filter sql without VIEW table * remove VIEW talbe * support tumble window * fix(doc) start in local * fix(doc) start in local * fix(doc) modif README.md * fix(doc) modif README.md * fix(doc) modif README.md * support hop and session --- rsqldb-disk/client/window/hop_eventTime.sql | 35 +++++++++++++++++++ rsqldb-disk/client/window/hop_processTime.sql | 33 +++++++++++++++++ .../client/window/session_eventTime.sql | 34 ++++++++++++++++++ .../client/window/session_processTime.sql | 32 +++++++++++++++++ 4 files changed, 134 insertions(+) create mode 100644 rsqldb-disk/client/window/hop_eventTime.sql create mode 100644 rsqldb-disk/client/window/hop_processTime.sql create mode 100644 rsqldb-disk/client/window/session_eventTime.sql create mode 100644 rsqldb-disk/client/window/session_processTime.sql diff --git a/rsqldb-disk/client/window/hop_eventTime.sql b/rsqldb-disk/client/window/hop_eventTime.sql new file mode 100644 index 0000000..8ebba8f --- /dev/null +++ b/rsqldb-disk/client/window/hop_eventTime.sql @@ -0,0 +1,35 @@ +CREATE TABLE user_clicks +( + username VARCHAR, + click_url VARCHAR, + ts TIMESTAMP, + WATERMARK wk FOR ts AS WITHOFFSET (ts, 2000)--为rowtime定义Watermark。 +) WITH ( + type = 'rocketmq', + topic = 'user_clicks', + groupName = 'user_clicks', + namesrvAddr = '127.0.0.1:9876', + isJsonData = 'true', + msgIsJsonArray = 'false' + ); + + +CREATE TABLE hop_output +( + window_start TIMESTAMP, + window_end TIMESTAMP, + username VARCHAR, + clicks BIGINT +) WITH ( + type = 'print' + ); + + +INSERT INTO hop_output +SELECT + HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start, + HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end, + username as username, + COUNT(click_url) as clicks +FROM user_clicks +GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username; \ No newline at end of file diff --git a/rsqldb-disk/client/window/hop_processTime.sql b/rsqldb-disk/client/window/hop_processTime.sql new file mode 100644 index 0000000..d1fd8f6 --- /dev/null +++ b/rsqldb-disk/client/window/hop_processTime.sql @@ -0,0 +1,33 @@ +CREATE TABLE window_test +( + username VARCHAR, + click_url VARCHAR, + ts as PROCTIME() +) WITH ( + type = 'rocketmq', + topic = 'window_test', + groupName = 'window_test', + namesrvAddr = '127.0.0.1:9876', + isJsonData = 'true', + msgIsJsonArray = 'false' + ); + +CREATE TABLE hop_output +( + window_start TIMESTAMP, + window_end TIMESTAMP, + username VARCHAR, + clicks BIGINT +) with ( + type='print' + ); + +INSERT INTO hop_output +SELECT + HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_start, + HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE) as window_end, + username as username, + COUNT(click_url) as clicks +FROM window_test +GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username; + diff --git a/rsqldb-disk/client/window/session_eventTime.sql b/rsqldb-disk/client/window/session_eventTime.sql new file mode 100644 index 0000000..1c829ec --- /dev/null +++ b/rsqldb-disk/client/window/session_eventTime.sql @@ -0,0 +1,34 @@ +CREATE TABLE user_clicks +( + username VARCHAR, + click_url VARCHAR, + ts TIMESTAMP, + WATERMARK wk FOR ts as withOffset(ts, 2000) --为rowtime定义Watermark。 +) WITH ( + type = 'rocketmq', + topic = 'user_clicks', + groupName = 'user_clicks', + namesrvAddr = '127.0.0.1:9876', + isJsonData = 'true', + msgIsJsonArray = 'false' + ); + + +CREATE TABLE task_sink +( + window_start TIMESTAMP, + window_end TIMESTAMP, + username VARCHAR, + clicks BIGINT +) WITH ( + type = 'print' + ); + +INSERT INTO task_sink +SELECT + SESSION_START(ts, INTERVAL '30' SECOND) as window_start, + SESSION_END(ts, INTERVAL '30' SECOND) as window_end, + username as username, + COUNT(click_url) as clicks +FROM user_clicks +GROUP BY SESSION(ts, INTERVAL '30' SECOND), username; diff --git a/rsqldb-disk/client/window/session_processTime.sql b/rsqldb-disk/client/window/session_processTime.sql new file mode 100644 index 0000000..6785407 --- /dev/null +++ b/rsqldb-disk/client/window/session_processTime.sql @@ -0,0 +1,32 @@ +CREATE TABLE window_test +( + username VARCHAR, + click_url VARCHAR, + ts as PROCTIME() +) WITH ( + type = 'rocketmq', + topic = 'window_test', + groupName = 'window_test', + namesrvAddr = '127.0.0.1:9876', + isJsonData = 'true', + msgIsJsonArray = 'false' + ); + +CREATE TABLE session_output +( + window_start TIMESTAMP, + window_end TIMESTAMP, + username VARCHAR, + clicks BIGINT +) with ( + type='print' + ); + +INSERT INTO session_output +SELECT + SESSION_START(ts, INTERVAL '30' SECOND) as window_start, + SESSION_END(ts, INTERVAL '30' SECOND) as window_end, + username as username, + COUNT(click_url) as clicks +FROM window_test +GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;