Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into wrj/varargs
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
wangrunji0408 committed Sep 14, 2023
2 parents 01c9519 + 9814af8 commit 64831c0
Show file tree
Hide file tree
Showing 144 changed files with 3,079 additions and 1,409 deletions.
108 changes: 54 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ arrow-schema = "46"
arrow-buffer = "46"
arrow-flight = "46"
arrow-select = "46"
tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git", features = [
"profiling",
"stats",
], rev = "64a2d9" }

risingwave_backup = { path = "./src/storage/backup" }
risingwave_batch = { path = "./src/batch" }
Expand Down
1 change: 1 addition & 0 deletions ci/scripts/regress-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mv target/debug/risingwave_regress_test-"$profile" target/debug/risingwave_regre
chmod +x ./target/debug/risingwave_regress_test

echo "--- Postgres regress test"
apt-get -y update
apt-get -y install locales
locale-gen C
export LANGUAGE=C
Expand Down
35 changes: 35 additions & 0 deletions e2e_test/batch/basic/unnest.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,38 @@ select distinct unnest(array[1,1,2,3,1]) as x;
1
2
3

query I
select * from unnest(array[0,1,2]) with ordinality;
----
0 1
1 2
2 3

query I
select * from unnest(array[0,1,2]) with ordinality, unnest(array[3,4]) with ordinality as unnest_2;
----
0 1 3 1
0 1 4 2
1 2 3 1
1 2 4 2
2 3 3 1
2 3 4 2

statement ok
create table t(arr varchar[]);

statement ok
insert into t values (Array['a','b', 'c']), (Array['d','e']);

query I rowsort
select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num);
----
{a,b,c} a 1
{a,b,c} b 2
{a,b,c} c 3
{d,e} d 1
{d,e} e 2

statement ok
drop table t;
2 changes: 1 addition & 1 deletion e2e_test/sink/elasticsearch/elasticsearch_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE TABLE t7 (v1 int primary key, v2 bigint, v3 varchar);

statement ok
CREATE SINK s7 AS select t7.v1 as v1, t7.v2 as v2, t7.v3 as v3 from t7 WITH (
connector = 'elasticsearch-7',
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch:9200',
username = 'elastic',
Expand Down
67 changes: 67 additions & 0 deletions e2e_test/streaming/aggregate/boolean.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
create table t (v boolean);

statement ok
create materialized view mv as select
bool_and(v),
bool_or(v)
from t;

query BB
select * from mv;
----
NULL NULL


statement ok
insert into t values (true);

# table values: true

query BB
select * from mv;
----
t t


statement ok
insert into t values (false);

# table values: true, false

query BB
select * from mv;
----
f t


statement ok
delete from t where v = true;

# table values: false

query BB
select * from mv;
----
f f


statement ok
delete from t;

# table values: empty

query BB
select * from mv;
----
NULL NULL


statement ok
drop materialized view mv;

statement ok
drop table t;
35 changes: 35 additions & 0 deletions e2e_test/streaming/project_set.slt
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,38 @@ with cte as (SELECT 1 as v1, unnest(array[1,2,3,4,5]) AS v2) select v1 from cte;
1
1
1

statement ok
create table t(arr varchar[]);

statement ok
create materialized view mv as select * from t cross join unnest(t.arr) WITH ORDINALITY AS x(elts, num);

statement ok
insert into t values (Array['a','b', 'c']), (Array['d','e']);

query I rowsort
select * from mv;
----
{a,b,c} a 1
{a,b,c} b 2
{a,b,c} c 3
{d,e} d 1
{d,e} e 2

statement ok
update t set arr = Array['a', 'c'] where arr = Array['a','b', 'c'];

query I rowsort
select * from mv;
----
{a,c} a 1
{a,c} c 2
{d,e} d 1
{d,e} e 2

statement ok
drop materialized view mv;

statement ok
drop table t;
2 changes: 1 addition & 1 deletion e2e_test/streaming/rate_limit.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ statement ok
CREATE TABLE t1(v1 int, v2 int);

statement ok
SET RW_STREAMING_RATE_LIMIT TO 10000;
SET STREAMING_RATE_LIMIT TO 10000;

statement ok
CREATE MATERIALIZED VIEW m AS SELECT * FROM t1;
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/datagen/ad_click/ad_click.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func (g *adClickGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord) {
record := &clickEvent{
UserId: rand.Int63n(100000),
AdId: rand.Int63n(10),
ClickTimestamp: now.Add(time.Duration(rand.Intn(1000)) * time.Millisecond).Format(gen.RwTimestampLayout),
ImpressionTimestamp: now.Format(gen.RwTimestampLayout),
ClickTimestamp: now.Add(time.Duration(rand.Intn(1000)) * time.Millisecond).Format(gen.RwTimestamptzLayout),
ImpressionTimestamp: now.Format(gen.RwTimestamptzLayout),
}
select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/datagen/ad_ctr/ad_ctr.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,14 @@ func (g *adCtrGen) generate() []sink.SinkRecord {
&adImpressionEvent{
BidId: bidId,
AdId: adId,
ImpressionTimestamp: time.Now().Format(gen.RwTimestampLayout),
ImpressionTimestamp: time.Now().Format(gen.RwTimestamptzLayout),
},
}
if g.hasClick(adId) {
randomDelay := time.Duration(g.faker.IntRange(1, 10) * int(time.Second))
events = append(events, &adClickEvent{
BidId: bidId,
ClickTimestamp: time.Now().Add(randomDelay).Format(gen.RwTimestampLayout),
ClickTimestamp: time.Now().Add(randomDelay).Format(gen.RwTimestamptzLayout),
})
}
return events
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/datagen/cdn_metrics/nics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (impl *deviceNicsMonitor) newMetrics(
MetricName: metricName,
Aggregation: aggregation,
NicName: "eth" + strconv.Itoa(NicId),
ReportTime: reportTime.Format(gen.RwTimestampLayout),
ReportTime: reportTime.Format(gen.RwTimestamptzLayout),
Bandwidth: maxBandwidth,
Value: float64(value),
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/datagen/cdn_metrics/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (m *deviceTcpMonitor) newMetrics(metricName string, reportTime time.Time, v
return &tcpMetric{
DeviceId: m.deviceId,
MetricName: metricName,
ReportTime: reportTime.Format(gen.RwTimestampLayout),
ReportTime: reportTime.Format(gen.RwTimestamptzLayout),
Value: value,
}
}
2 changes: 1 addition & 1 deletion integration_tests/datagen/clickstream/clickstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (g *clickStreamGen) generate() sink.SinkRecord {
UserId: fmt.Sprint(userId),
TargetId: string(target) + fmt.Sprint(targetId),
TargetType: string(target),
EventTimestamp: time.Now().Format(gen.RwTimestampLayout),
EventTimestamp: time.Now().Format(gen.RwTimestamptzLayout),
BehaviorType: behavior,
ParentTargetType: parentTargetType,
ParentTargetId: parentTargetId,
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/datagen/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (g *orderEventGen) Load(ctx context.Context, outCh chan<- sink.SinkRecord)
OrderId: g.seqOrderId,
RestaurantId: rand.Int63n(num_of_restaurants),
OrderState: order_states[rand.Intn(len(order_states))],
OrderTimestamp: now.Add(time.Duration(rand.Intn(total_minutes)) * time.Minute).Format(gen.RwTimestampLayout),
OrderTimestamp: now.Add(time.Duration(rand.Intn(total_minutes)) * time.Minute).Format(gen.RwTimestampNaiveLayout),
}
g.seqOrderId++
select {
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/datagen/ecommerce/ecommerce.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (g *ecommerceGen) KafkaTopics() []string {
}

func (g *ecommerceGen) generate() []sink.SinkRecord {
ts := time.Now().Format(gen.RwTimestampLayout)
ts := time.Now().Format(gen.RwTimestampNaiveLayout)

if g.faker.Bool() && g.seqShipId >= g.seqOrderId {
// New order.
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/datagen/gen/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"datagen/sink/postgres"
"datagen/sink/pulsar"
"datagen/sink/s3"
"time"

"gonum.org/v1/gonum/stat/distuv"
)
Expand Down Expand Up @@ -47,7 +48,8 @@ type LoadGenerator interface {
Load(ctx context.Context, outCh chan<- sink.SinkRecord)
}

const RwTimestampLayout = "2006-01-02 15:04:05.07+01:00"
const RwTimestampNaiveLayout = time.DateTime
const RwTimestamptzLayout = time.RFC3339

type RandDist interface {
// Rand returns a random number ranging from [0, max].
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/datagen/twitter/twitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewTwitterGen() gen.LoadGenerator {
endTime, _ := time.Parse("2006-01-01", fmt.Sprintf("%d-01-01", endYear))
startTime, _ := time.Parse("2006-01-01", fmt.Sprintf("%d-01-01", startYear))
users[id] = &twitterUser{
CreatedAt: faker.DateRange(startTime, endTime).Format(gen.RwTimestampLayout),
CreatedAt: faker.DateRange(startTime, endTime).Format(gen.RwTimestamptzLayout),
Id: id,
Name: fmt.Sprintf("%s %s", faker.Name(), faker.Adverb()),
UserName: faker.Username(),
Expand Down Expand Up @@ -152,7 +152,7 @@ func (t *twitterGen) generate() twitterEvent {
return twitterEvent{
Data: tweetData{
Id: id,
CreatedAt: time.Now().Format(gen.RwTimestampLayout),
CreatedAt: time.Now().Format(gen.RwTimestamptzLayout),
Text: sentence,
Lang: gofakeit.Language(),
},
Expand Down
41 changes: 41 additions & 0 deletions integration_tests/elasticsearch-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Demo: Sinking to ElasticSearch

In this demo, we want to showcase how RisingWave is able to sink data to ElasticSearch.

1. Set the compose profile accordingly:
Demo with elasticsearch 7:
```
export COMPOSE_PROFILES=es7
```

Demo with elasticsearch 8
```
export COMPOSE_PROFILES=es8
```

2. Launch the cluster:

```sh
docker-compose up -d
```

The cluster contains a RisingWave cluster and its necessary dependencies, a datagen that generates the data, a single-node elasticsearch for sink.

3. Execute the SQL queries in sequence:

- create_source.sql
- create_mv.sql
- create_es[7/8]_sink.sql

4. Check the contents in ES:

```sh
# Check the document counts
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_count" -H 'Content-Type: application/json'

# Check the content of a document by user_id
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search" -H 'Content-Type: application/json' -d '{"query":{"term": {"user_id":2}}' | jq

# Get the first 10 documents sort by user_id
curl -XGET -u elastic:risingwave "http://localhost:9200/test/_search?size=10" -H 'Content-Type: application/json' -d'{"query":{"match_all":{}}, "sort": ["user_id"]}' | jq
```
9 changes: 9 additions & 0 deletions integration_tests/elasticsearch-sink/create_es7_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE SINK bhv_es_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch8:9200',
username = 'elastic',
password = 'risingwave'
);
9 changes: 9 additions & 0 deletions integration_tests/elasticsearch-sink/create_es8_sink.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE SINK bhv_es_sink
FROM
bhv_mv WITH (
connector = 'elasticsearch',
index = 'test',
url = 'http://elasticsearch8:9200',
username = 'elastic',
password = 'risingwave'
);
7 changes: 7 additions & 0 deletions integration_tests/elasticsearch-sink/create_mv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CREATE MATERIALIZED VIEW bhv_mv AS
SELECT
user_id,
target_id,
event_timestamp
FROM
user_behaviors;
18 changes: 18 additions & 0 deletions integration_tests/elasticsearch-sink/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
CREATE table user_behaviors (
user_id int,
target_id VARCHAR,
target_type VARCHAR,
event_timestamp TIMESTAMP,
behavior_type VARCHAR,
parent_target_type VARCHAR,
parent_target_id VARCHAR,
PRIMARY KEY(user_id)
) WITH (
connector = 'datagen',
fields.user_id.kind = 'sequence',
fields.user_id.start = '1',
fields.user_id.end = '1000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '10'
) FORMAT PLAIN ENCODE JSON;
Loading

0 comments on commit 64831c0

Please sign in to comment.