Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): batched iterator & e2e tests with in-memory state store #5322

Merged
merged 8 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions ci/scripts/e2e-test-parallel-in-memory.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/bash

# Exits as soon as any line fails.
set -euo pipefail

source ci/scripts/common.env.sh

while getopts 'p:' opt; do
case ${opt} in
p )
profile=$OPTARG
;;
\? )
echo "Invalid Option: -$OPTARG" 1>&2
exit 1
;;
: )
echo "Invalid option: $OPTARG requires an argument" 1>&2
;;
esac
done
shift $((OPTIND -1))

echo "--- Download artifacts"
mkdir -p target/debug
buildkite-agent artifact download risingwave-"$profile" target/debug/
buildkite-agent artifact download risedev-dev-"$profile" target/debug/
mv target/debug/risingwave-"$profile" target/debug/risingwave
mv target/debug/risedev-dev-"$profile" target/debug/risedev-dev

echo "--- Adjust permission"
chmod +x ./target/debug/risingwave
chmod +x ./target/debug/risedev-dev

echo "--- Generate RiseDev CI config"
cp ci/risedev-components.ci.env risedev-components.user.env

echo "--- Prepare RiseDev dev cluster"
cargo make pre-start-dev
cargo make link-all-in-one-binaries

host_args="-h localhost -p 4565 -h localhost -p 4566 -h localhost -p 4567"

echo "--- e2e, ci-3cn-3fe-in-memory, streaming"
cargo make ci-start ci-3cn-3fe-in-memory
sqllogictest ${host_args} -d dev './e2e_test/streaming/**/*.slt' -j 16 --junit "parallel-in-memory-streaming-${profile}"

echo "--- Kill cluster"
cargo make ci-kill

echo "--- e2e, ci-3cn-3fe-in-memory, batch"
cargo make ci-start ci-3cn-3fe-in-memory
sqllogictest ${host_args} -d dev './e2e_test/ddl/**/*.slt' --junit "parallel-in-memory-batch-ddl-${profile}"
sqllogictest ${host_args} -d dev './e2e_test/batch/**/*.slt' -j 16 --junit "parallel-in-memory-batch-${profile}"

echo "--- Kill cluster"
cargo make ci-kill
19 changes: 19 additions & 0 deletions ci/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,25 @@ steps:
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (parallel, in-memory) (release mode)"
command: "ci/scripts/e2e-test-parallel-in-memory.sh -p ci-release"
depends_on: "build"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- seek-oss/aws-sm#v2.3.1:
env:
BUILDKITE_ANALYTICS_TOKEN: buildkite-build-analytics-sqllogictest-token
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- test-collector#v1.0.0:
files: "*-junit.xml"
format: "junit"
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 10
retry: *auto-retry

- label: "end-to-end test (deterministic simulation)"
command: "timeout 8m ci/scripts/deterministic-e2e-test.sh"
depends_on: "build-simulation"
Expand Down
13 changes: 13 additions & 0 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ steps:
timeout_in_minutes: 12
retry: *auto-retry

- label: "end-to-end test (parallel, in-memory)"
command: "ci/scripts/e2e-test-parallel-in-memory.sh -p ci-dev"
depends_on: "build"
plugins:
- gencer/cache#v2.4.10: *cargo-cache
- docker-compose#v3.9.0:
run: rw-build-env
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 12
retry: *auto-retry

- label: "end-to-end source test"
command: "ci/scripts/e2e-source-test.sh -p ci-dev"
depends_on: "build"
Expand Down
33 changes: 32 additions & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ risedev:
enable-dashboard-v2: false
unsafe-disable-recovery: true
- use: compute-node
enable-in-memory-kv-state-backend: true
- use: frontend

# If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well.
Expand Down Expand Up @@ -374,6 +375,33 @@ risedev:
exporter-port: 2224
- use: compactor

ci-3cn-3fe-in-memory:
- use: etcd
unsafe-no-fsync: true
- use: meta-node
unsafe-disable-recovery: true
- use: compute-node
port: 5687
exporter-port: 1222
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5688
exporter-port: 1223
enable-in-memory-kv-state-backend: true
- use: compute-node
port: 5689
exporter-port: 1224
enable-in-memory-kv-state-backend: true
- use: frontend
port: 4565
exporter-port: 2222
- use: frontend
port: 4566
exporter-port: 2223
- use: frontend
port: 4567
exporter-port: 2224

ci-kafka:
- use: minio
- use: etcd
Expand Down Expand Up @@ -508,6 +536,9 @@ template:
# If `user-managed` is true, this service will be started by user with the above config
user-managed: false

# Whether to enable in-memory pure KV state backend
enable-in-memory-kv-state-backend: false

meta-node:
# Meta-node advertise address
address: "127.0.0.1"
Expand Down Expand Up @@ -604,7 +635,7 @@ template:

# Listen address
listen-address: ${address}

# Prometheus exporter listen port
exporter-port: 2222

Expand Down
35 changes: 14 additions & 21 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::rpc::service::monitor_service::{
GrpcStackTraceManagerRef, MonitorServiceImpl, StackTraceMiddlewareLayer,
};
use crate::rpc::service::stream_service::StreamServiceImpl;
use crate::server::StateStoreImpl::HummockStateStore;
use crate::{ComputeNodeConfig, ComputeNodeOpts};

/// Bootstraps the compute-node.
Expand Down Expand Up @@ -125,28 +124,22 @@ pub async fn compute_node_serve(
.await
.unwrap();

let local_version_manager = match &state_store {
HummockStateStore(monitored) => monitored.local_version_manager(),
_ => {
panic!();
}
};

let compute_observer_node =
ComputeObserverNode::new(filter_key_extractor_manager.clone(), local_version_manager);
let observer_manager = ObserverManager::new(
meta_client.clone(),
client_addr.clone(),
Box::new(compute_observer_node),
WorkerType::ComputeNode,
)
.await;

let observer_join_handle = observer_manager.start().await.unwrap();
join_handle_vec.push(observer_join_handle);

let mut extra_info_sources: Vec<ExtraInfoSourceRef> = vec![];
if let StateStoreImpl::HummockStateStore(storage) = &state_store {
let local_version_manager = storage.local_version_manager();
let compute_observer_node =
ComputeObserverNode::new(filter_key_extractor_manager.clone(), local_version_manager);
let observer_manager = ObserverManager::new(
meta_client.clone(),
client_addr.clone(),
Box::new(compute_observer_node),
WorkerType::ComputeNode,
)
.await;

let observer_join_handle = observer_manager.start().await.unwrap();
join_handle_vec.push(observer_join_handle);

assert!(
storage
.local_version_manager()
Expand Down
1 change: 1 addition & 0 deletions src/risedevtool/src/service_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct ComputeNodeConfig {
pub provide_jaeger: Option<Vec<JaegerConfig>>,
pub provide_compactor: Option<Vec<CompactorConfig>>,
pub user_managed: bool,
pub enable_in_memory_kv_state_backend: bool,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand Down
36 changes: 26 additions & 10 deletions src/risedevtool/src/task/compute_node_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,33 @@ impl ComputeNodeService {
let provide_aws_s3 = config.provide_aws_s3.as_ref().unwrap();
let provide_compute_node = config.provide_compute_node.as_ref().unwrap();

let is_shared_backend = add_storage_backend(
&config.id,
provide_minio,
provide_aws_s3,
hummock_in_memory_strategy,
cmd,
)?;
let is_shared_backend = match (
config.enable_in_memory_kv_state_backend,
provide_minio.as_slice(),
provide_aws_s3.as_slice(),
) {
(true, [], []) => {
cmd.arg("--state-store").arg("in-memory");
false
}
(true, _, _) => {
return Err(anyhow!(
"When `enable_in_memory_kv_state_backend` is enabled, no minio and aws-s3 should be provided.",
));
}
(false, provide_minio, provide_aws_s3) => add_storage_backend(
&config.id,
provide_minio,
provide_aws_s3,
hummock_in_memory_strategy,
cmd,
)?,
};

if provide_compute_node.len() > 1 && !is_shared_backend {
return Err(anyhow!(
"should use a shared backend (e.g. MinIO) for multiple compute-node configuration. Consider adding `use: minio` in risedev config."
));
// Using a non-shared backend with multiple compute nodes will be problematic for state
// sharing like scaling. For distributed end-to-end tests with in-memory state store,
// this is acceptable.
}

let provide_meta_node = config.provide_meta_node.as_ref().unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#![feature(assert_matches)]
#![feature(is_sorted)]
#![feature(btree_drain_filter)]
#![feature(exact_size_is_empty)]
#![cfg_attr(coverage, feature(no_coverage))]

pub mod hummock;
Expand Down
Loading