Skip to content

Commit

Permalink
feat(source): create source stream with retry during recovery (#19805)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Dec 18, 2024
1 parent 3cbc231 commit fe84f69
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 33 deletions.
4 changes: 0 additions & 4 deletions e2e_test/source_inline/kafka/issue_19563.slt.serial
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ select array_length(upstream_fragment_ids) from rw_fragments where array_contain
----
3

# XXX: wait until source reader is ready. then produce data.
# This is a temporary workaround for a data loss bug https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002
sleep 2s

system ok
cat <<EOF | rpk topic produce test-topic-19563
{"v1": "3031-01-01 19:00:00"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,41 @@
public class DbzSourceUtils {
static final Logger LOG = LoggerFactory.getLogger(DbzSourceUtils.class);

public static void createPostgresPublicationInValidate(Map<String, String> properties)
throws SQLException {
boolean pubAutoCreate =
properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true");
var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME);
if (!pubAutoCreate) {
LOG.info(
"Postgres publication auto creation is disabled, skip creation for publication {}.",
pubName);
return;
}
createPostgresPublicationInner(properties, pubName);
}

/**
* This method is used to create a publication for the cdc source job or cdc table if it doesn't
* exist.
*/
public static void createPostgresPublicationIfNeeded(
public static void createPostgresPublicationInSourceExecutor(
Map<String, String> properties, long sourceId) throws SQLException {
boolean pubAutoCreate =
properties.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true");
var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME);
if (!pubAutoCreate) {
LOG.info(
"Postgres publication auto creation is disabled, skip creation for source {}.",
"Postgres publication auto creation is disabled, skip creation for publication {}, sourceId = {}.",
pubName,
sourceId);
return;
}
createPostgresPublicationInner(properties, pubName);
}

var pubName = properties.get(DbzConnectorConfig.PG_PUB_NAME);
private static void createPostgresPublicationInner(
Map<String, String> properties, String pubName) throws SQLException {
var dbHost = properties.get(DbzConnectorConfig.HOST);
var dbPort = properties.get(DbzConnectorConfig.PORT);
var dbName = properties.get(DbzConnectorConfig.DB_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,22 +354,22 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
}
}

if (!isPublicationExists) {
if (!isPublicationExists && !pubAutoCreate) {
// We require a publication on upstream to publish table cdc events
if (!pubAutoCreate) {
throw ValidatorUtils.invalidArgument(
"Publication '" + pubName + "' doesn't exist and auto create is disabled");
} else {
// createPublicationIfNeeded(Optional.empty());
LOG.info(
"Publication '{}' doesn't exist, will be created in the process of streaming job.",
this.pubName);
}
throw ValidatorUtils.invalidArgument(
"Publication '" + pubName + "' doesn't exist and auto create is disabled");
}

// If the source properties is created by share source, skip the following
// check of publication
if (isCdcSourceJob) {
if (!isPublicationExists) {
LOG.info(
"creating cdc source job: publication '{}' doesn't exist, creating...",
pubName);
DbzSourceUtils.createPostgresPublicationInValidate(userProps);
LOG.info("creating cdc source job: publication '{}' created successfully", pubName);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long
boolean isCdcSourceJob = request.getIsSourceJob();

if (request.getSourceType() == POSTGRES) {
DbzSourceUtils.createPostgresPublicationIfNeeded(
DbzSourceUtils.createPostgresPublicationInSourceExecutor(
request.getPropertiesMap(), request.getSourceId());
}

Expand Down
173 changes: 158 additions & 15 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
// limitations under the License.

use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;

use anyhow::anyhow;
use either::Either;
use futures::stream::BoxStream;
use futures::TryStreamExt;
use itertools::Itertools;
use risingwave_common::array::ArrayRef;
Expand All @@ -38,6 +41,7 @@ use thiserror_ext::AsReport;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
use tracing::Instrument;

use super::executor_core::StreamSourceCore;
use super::{
Expand All @@ -46,6 +50,7 @@ use super::{
};
use crate::common::rate_limit::limited_chunk_size;
use crate::executor::prelude::*;
use crate::executor::source::get_infinite_backoff_strategy;
use crate::executor::stream_reader::StreamReaderWithPause;
use crate::executor::UpdateMutation;

Expand Down Expand Up @@ -474,6 +479,8 @@ impl<S: StateStore> SourceExecutor<S> {
};

core.split_state_store.init_epoch(first_epoch).await?;
// initial_dispatch_num is 0 means the source executor doesn't have downstream jobs
// and is newly created
let mut is_uninitialized = self.actor_ctx.initial_dispatch_num == 0;
for ele in &mut boot_state {
if let Some(recover_state) = core
Expand All @@ -500,20 +507,134 @@ impl<S: StateStore> SourceExecutor<S> {

let recover_state: ConnectorState = (!boot_state.is_empty()).then_some(boot_state);
tracing::debug!(state = ?recover_state, "start with state");
let (source_chunk_reader, latest_splits) = self
.build_stream_source_reader(
&source_desc,
recover_state,
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
// Note that shared CDC source is special. It already starts from latest.
self.is_shared_non_cdc && is_uninitialized,

let mut received_resume_during_build = false;
let mut barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();

// Build the source stream reader.
let (source_chunk_reader, latest_splits) = if is_uninitialized {
tracing::info!("source uninitialized, build source stream reader w/o retry.");
let (source_chunk_reader, latest_splits) = self
.build_stream_source_reader(
&source_desc,
recover_state,
// For shared source, we start from latest and let the downstream SourceBackfillExecutors to read historical data.
// It's highly probable that the work of scanning historical data cannot be shared,
// so don't waste work on it.
// For more details, see https://github.com/risingwavelabs/risingwave/issues/16576#issuecomment-2095413297
// Note that shared CDC source is special. It already starts from latest.
self.is_shared_non_cdc,
)
.instrument_await("source_build_reader")
.await?;
(
source_chunk_reader.map_err(StreamExecutorError::connector_error),
latest_splits,
)
.instrument_await("source_build_reader")
.await?;
let source_chunk_reader = source_chunk_reader.map_err(StreamExecutorError::connector_error);
} else {
tracing::info!("source initialized, build source stream reader with retry.");
// Build the source stream reader with retry during recovery.
// We only build source stream reader with retry during recovery,
// because we can rely on the persisted source states to recover the source stream
// and can avoid the potential race with "seek to latest"
// https://github.com/risingwavelabs/risingwave/issues/19681#issuecomment-2532183002
let mut reader_and_splits: Option<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)> =
None;
let source_reader = source_desc.source.clone();
let (column_ids, source_ctx) = self.prepare_source_stream_build(&source_desc);
let source_ctx = Arc::new(source_ctx);
let mut build_source_stream_fut = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
match source_reader
.build_stream(
recover_state.clone(),
column_ids.clone(),
source_ctx.clone(),
false, // not need to seek to latest since source state is initialized
)
.await {
Ok((stream, latest_splits)) => Ok((stream, latest_splits)),
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to build source stream, retrying...");
Err(e)
}
}
})
.instrument(tracing::info_span!("build_source_stream_with_retry"))
.await
.expect("Retry build source stream until success.")
});

// loop to create source stream until success
loop {
if let Some(barrier) = build_source_stream_and_poll_barrier(
&mut barrier_stream,
&mut reader_and_splits,
&mut build_source_stream_fut,
)
.await?
{
if let Message::Barrier(barrier) = barrier {
if let Some(mutation) = barrier.mutation.as_deref() {
match mutation {
Mutation::Throttle(actor_to_apply) => {
if let Some(new_rate_limit) =
actor_to_apply.get(&self.actor_ctx.id)
&& *new_rate_limit != self.rate_limit_rps
{
tracing::info!(
"updating rate limit from {:?} to {:?}",
self.rate_limit_rps,
*new_rate_limit
);

// update the rate limit option, we will apply the rate limit
// when we finish building the source stream.
self.rate_limit_rps = *new_rate_limit;
}
}
Mutation::Resume => {
// We record the Resume mutation here and postpone the resume of the source stream
// after we have successfully built the source stream.
received_resume_during_build = true;
}
_ => {
// ignore other mutations and output a warn log
tracing::warn!(
"Received a mutation {:?} to be ignored, because we only handle Throttle and Resume before
finish building source stream.",
mutation
);
}
}
}

// bump state store epoch
let _ = self.persist_state_and_clear_cache(barrier.epoch).await?;
yield Message::Barrier(barrier);
} else {
unreachable!(
"Only barrier message is expected when building source stream."
);
}
} else {
assert!(reader_and_splits.is_some());
tracing::info!("source stream created successfully");
break;
}
}
let (source_chunk_reader, latest_splits) =
reader_and_splits.expect("source chunk reader and splits must be created");

(
apply_rate_limit(source_chunk_reader, self.rate_limit_rps)
.boxed()
.map_err(StreamExecutorError::connector_error),
latest_splits,
)
};

if let Some(latest_splits) = latest_splits {
// make sure it is written to state table later.
// Then even it receives no messages, we can observe it in state table.
Expand All @@ -525,13 +646,12 @@ impl<S: StateStore> SourceExecutor<S> {
}
// Merge the chunks from source and the barriers into a single stream. We prioritize
// barriers over source data chunks here.
let barrier_stream = barrier_to_message_stream(barrier_receiver).boxed();
let mut stream =
StreamReaderWithPause::<true, StreamChunk>::new(barrier_stream, source_chunk_reader);
let mut command_paused = false;

// - If the first barrier requires us to pause on startup, pause the stream.
if is_pause_on_startup {
if is_pause_on_startup && !received_resume_during_build {
tracing::info!("source paused on startup");
stream.pause_stream();
command_paused = true;
Expand Down Expand Up @@ -746,6 +866,29 @@ impl<S: StateStore> SourceExecutor<S> {
}
}

async fn build_source_stream_and_poll_barrier(
barrier_stream: &mut BoxStream<'static, StreamExecutorResult<Message>>,
reader_and_splits: &mut Option<(BoxChunkSourceStream, Option<Vec<SplitImpl>>)>,
build_future: &mut Pin<
Box<impl Future<Output = (BoxChunkSourceStream, Option<Vec<SplitImpl>>)>>,
>,
) -> StreamExecutorResult<Option<Message>> {
if reader_and_splits.is_some() {
return Ok(None);
}

tokio::select! {
biased;
build_ret = &mut *build_future => {
*reader_and_splits = Some(build_ret);
Ok(None)
}
msg = barrier_stream.next() => {
msg.transpose()
}
}
}

impl<S: StateStore> Execute for SourceExecutor<S> {
fn execute(self: Box<Self>) -> BoxedMessageStream {
if self.stream_source_core.is_some() {
Expand Down

0 comments on commit fe84f69

Please sign in to comment.