diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 2f3c6d420087d..1bdd8d4cd2717 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -56,12 +56,18 @@ public void validate( String tableName = tableProperties.get(TABLE_NAME_PROP); Set jdbcColumns = new HashSet<>(); Set jdbcPk = new HashSet<>(); + Set jdbcTableNames = new HashSet<>(); try (Connection conn = DriverManager.getConnection(jdbcUrl); + ResultSet tableNamesResultSet = + conn.getMetaData().getTables(null, null, "%", null); ResultSet columnResultSet = conn.getMetaData().getColumns(null, null, tableName, null); ResultSet pkResultSet = conn.getMetaData().getPrimaryKeys(null, null, tableName); ) { + while (tableNamesResultSet.next()) { + jdbcTableNames.add(tableNamesResultSet.getString("TABLE_NAME")); + } while (columnResultSet.next()) { jdbcColumns.add(columnResultSet.getString("COLUMN_NAME")); } @@ -72,6 +78,12 @@ public void validate( throw Status.INTERNAL.withCause(e).asRuntimeException(); } + if (!jdbcTableNames.contains(tableName)) { + throw Status.INVALID_ARGUMENT + .withDescription("table not found: " + tableName) + .asRuntimeException(); + } + // Check that all columns in tableSchema exist in the JDBC table. for (String sinkColumn : tableSchema.getColumnNames()) { if (!jdbcColumns.contains(sinkColumn)) { diff --git a/src/connector/src/sink/remote.rs b/src/connector/src/sink/remote.rs index e2818b4ddd801..b69700a539156 100644 --- a/src/connector/src/sink/remote.rs +++ b/src/connector/src/sink/remote.rs @@ -125,10 +125,12 @@ impl RemoteSink { })?; let host_addr = HostAddr::try_from(&address).map_err(SinkError::from)?; let client = ConnectorClient::new(host_addr).await.map_err(|err| { - SinkError::Remote(format!( + let msg = format!( "failed to connect to connector endpoint `{}`: {:?}", &address, err - )) + ); + tracing::warn!(msg); + SinkError::Remote(msg) })?; let table_schema = Some(TableSchema { @@ -151,7 +153,21 @@ impl RemoteSink { ) .await .map_err(SinkError::from)?; - let _ = response.next().await.unwrap(); + response.next().await.unwrap().map_err(|e| { + let msg = format!( + "failed to start sink stream for connector `{}` with error code: {}, message: {:?}", + &config.connector_type, + e.code(), + e.message() + ); + tracing::warn!(msg); + SinkError::Remote(msg) + })?; + tracing::info!( + "{:?} sink stream started with properties: {:?}", + &config.connector_type, + &config.properties + ); Ok(RemoteSink { connector_type: config.connector_type,