Skip to content

Commit

Permalink
fix: fix incorrect usage of ok_or with anyhow (#11589)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 10, 2023
1 parent 9c9c4a2 commit 2342e8b
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 62 deletions.
2 changes: 1 addition & 1 deletion src/common/src/system_param/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ macro_rules! impl_set_system_param {
let v = if let Some(v) = value {
v.parse().map_err(|_| format!("cannot parse parameter value"))?
} else {
$default.ok_or(format!("{} does not have a default value", key))?
$default.ok_or_else(|| format!("{} does not have a default value", key))?
};
OverrideValidateOnSet::$field(&v)?;
params.$field = Some(v);
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/parser/unified/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,19 +127,19 @@ impl<'a> AvroParseOptions<'a> {
.find(|field| field.0 == field_name)
.map(|field| &field.1)
};
let scale = match find_in_records("scale").ok_or(AccessError::Other(anyhow!(
"scale field not found in VariableScaleDecimal"
)))? {
let scale = match find_in_records("scale").ok_or_else(|| {
AccessError::Other(anyhow!("scale field not found in VariableScaleDecimal"))
})? {
Value::Int(scale) => Ok(*scale),
avro_value => Err(AccessError::Other(anyhow!(
"scale field in VariableScaleDecimal is not int, got {:?}",
avro_value
))),
}?;

let value: BigInt = match find_in_records("value").ok_or(AccessError::Other(
anyhow!("value field not found in VariableScaleDecimal"),
))? {
let value: BigInt = match find_in_records("value").ok_or_else(|| {
AccessError::Other(anyhow!("value field not found in VariableScaleDecimal"))
})? {
Value::Bytes(bytes) => Ok(BigInt::from_signed_bytes_be(bytes)),
avro_value => Err(AccessError::Other(anyhow!(
"value field in VariableScaleDecimal is not bytes, got {:?}",
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ impl ClickHouseSinkWriter {
.r#type
.split("DateTime64(")
.last()
.ok_or(SinkError::ClickHouse("must have last".to_string()))?
.ok_or_else(|| SinkError::ClickHouse("must have last".to_string()))?
.split(')')
.next()
.ok_or(SinkError::ClickHouse("must have next".to_string()))?
.ok_or_else(|| SinkError::ClickHouse("must have next".to_string()))?
.parse::<u8>()
.map_err(|e| SinkError::ClickHouse(format!("clickhouse sink error {}", e)))?
} else {
Expand Down Expand Up @@ -482,7 +482,7 @@ impl ClickHouseSinkWriter {
Op::UpdateDelete => continue,
Op::UpdateInsert => {
let pk = Self::build_ck_fields(row.datum_at(pk_index), accuracy_time)?
.ok_or(SinkError::ClickHouse("pk can not be none".to_string()))?;
.ok_or_else(|| SinkError::ClickHouse("pk can not be none".to_string()))?;
let fields_vec = self.build_update_fields(row, accuracy_time)?;
self.client
.update(
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/sink/coordinate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> SinkWriter for Coordi
async fn barrier(&mut self, is_checkpoint: bool) -> Result<Self::CommitMetadata> {
let metadata = self.inner.barrier(is_checkpoint).await?;
if is_checkpoint {
let metadata = metadata.ok_or(SinkError::Coordinator(anyhow!(
"should get metadata on checkpoint barrier"
)))?;
let metadata = metadata.ok_or_else(|| {
SinkError::Coordinator(anyhow!("should get metadata on checkpoint barrier"))
})?;
// TODO: add metrics to measure time to commit
self.coordinator_stream_handle
.commit(self.epoch, metadata)
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn get_property_required(
with_properties
.get(property)
.map(|s| s.to_lowercase())
.ok_or(anyhow!("Required property \"{property}\" is not provided"))
.ok_or_else(|| anyhow!("Required property \"{property}\" is not provided"))
}

#[inline(always)]
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/handler/create_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ fn get_connection_property_required(
with_properties
.get(property)
.map(|s| s.to_lowercase())
.ok_or(RwError::from(ProtocolError(format!(
"Required property \"{property}\" is not provided"
))))
.ok_or_else(|| {
RwError::from(ProtocolError(format!(
"Required property \"{property}\" is not provided"
)))
})
}

fn resolve_private_link_properties(
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ pub fn bind_sql_columns(column_defs: &[ColumnDef]) -> Result<Vec<ColumnCatalog>>
..
} = column;

let data_type = data_type.clone().ok_or(ErrorCode::InvalidInputSyntax(
"data type is not specified".into(),
))?;
let data_type = data_type
.clone()
.ok_or_else(|| ErrorCode::InvalidInputSyntax("data type is not specified".into()))?;
if let Some(collation) = collation {
return Err(ErrorCode::NotImplemented(
format!("collation \"{}\"", collation),
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,10 +668,12 @@ impl SessionImpl {
let schema = catalog_reader.get_schema_by_name(db_name, schema.name().as_str())?;
let connection = schema
.get_connection_by_name(connection_name)
.ok_or(RwError::from(ErrorCode::ItemNotFound(format!(
"connection {} not found",
connection_name
))))?;
.ok_or_else(|| {
RwError::from(ErrorCode::ItemNotFound(format!(
"connection {} not found",
connection_name
)))
})?;
Ok(connection.clone())
}

Expand Down
31 changes: 15 additions & 16 deletions src/meta/src/rpc/cloud_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to delete VPC endpoint. endpoint_id {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

if let Some(ret) = output.unsuccessful() {
Expand Down Expand Up @@ -167,18 +167,19 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to check availability of VPC endpoint. endpoint_id: {vpc_endpoint_id}, error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

match output.vpc_endpoints {
Some(endpoints) => {
let endpoint = endpoints.into_iter().exactly_one().map_err(|_| {
MetaError::from(anyhow!("More than one VPC endpoint found with the same ID"))
})?;
let endpoint = endpoints
.into_iter()
.exactly_one()
.map_err(|_| anyhow!("More than one VPC endpoint found with the same ID"))?;
if let Some(state) = endpoint.state {
match state {
State::Available => {
Expand Down Expand Up @@ -210,19 +211,17 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to describe VPC endpoint service, error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

match output.service_details {
Some(details) => {
let detail = details.into_iter().exactly_one().map_err(|_| {
MetaError::from(anyhow!(
"More than one VPC endpoint service found with the same name"
))
anyhow!("More than one VPC endpoint service found with the same name")
})?;
if let Some(azs) = detail.availability_zones {
service_azs.extend(azs.into_iter());
Expand Down Expand Up @@ -255,9 +254,9 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}",
anyhow!("Failed to describe subnets for vpc_id {vpc_id}. error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")))
e.meta().extra("aws_request_id"))
})?;

let subnets = output
Expand Down Expand Up @@ -315,12 +314,12 @@ impl AwsEc2Client {
.send()
.await
.map_err(|e| {
MetaError::from(anyhow!(
anyhow!(
"Failed to create vpc endpoint: vpc_id {vpc_id}, \
service_name {service_name}. error: {:?}, aws_request_id: {:?}",
e.message(),
e.meta().extra("aws_request_id")
))
)
})?;

let endpoint = output.vpc_endpoint().unwrap();
Expand Down
16 changes: 8 additions & 8 deletions src/rpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
request_sender
.send(first_request)
.await
.map_err(|err| RpcError::Internal(anyhow!(err.to_string())))?;
.map_err(|err| anyhow!(err.to_string()))?;

let mut response_stream =
init_stream_fn(Request::new(ReceiverStream::new(request_receiver)))
Expand All @@ -243,9 +243,7 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
let first_response = response_stream
.next()
.await
.ok_or(RpcError::Internal(anyhow!(
"get empty response from start sink request"
)))??;
.ok_or_else(|| anyhow!("get empty response from start sink request"))??;

Ok((
Self {
Expand All @@ -261,12 +259,14 @@ impl<REQ: 'static, RSP: 'static> BidiStreamHandle<REQ, RSP> {
.response_stream
.next()
.await
.ok_or(RpcError::Internal(anyhow!("end of response stream")))??)
.ok_or_else(|| anyhow!("end of response stream"))??)
}

pub async fn send_request(&mut self, request: REQ) -> Result<()> {
self.request_sender.send(request).await.map_err(|_| {
RpcError::Internal(anyhow!("unable to send request {}", type_name::<REQ>()))
})
Ok(self
.request_sender
.send(request)
.await
.map_err(|_| anyhow!("unable to send request {}", type_name::<REQ>()))?)
}
}
6 changes: 3 additions & 3 deletions src/sqlparser/src/ast/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,9 @@ impl SourceSchemaV2 {
};
let consume_string_from_options =
|row_options: &BTreeMap<String, String>, key: &str| -> Result<AstString, ParserError> {
try_consume_string_from_options(row_options, key).ok_or(ParserError::ParserError(
format!("missing field {} in row format options", key),
))
try_consume_string_from_options(row_options, key).ok_or_else(|| {
ParserError::ParserError(format!("missing field {} in row format options", key))
})
};
let get_schema_location =
|row_options: &BTreeMap<String, String>| -> Result<(AstString, bool), ParserError> {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/common/log_store/in_mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl LogWriter for BoundedInMemLogStoreWriter {
.truncated_epoch_rx
.recv()
.await
.ok_or(anyhow!("cannot get truncated epoch"))?;
.ok_or_else(|| anyhow!("cannot get truncated epoch"))?;
assert_eq!(truncated_epoch, prev_epoch);
}

Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/source/fs_source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ impl<S: StateStore> FsSourceExecutor<S> {
.instrument_await("source_recv_first_barrier")
.await
.ok_or_else(|| {
StreamExecutorError::from(anyhow!(
anyhow!(
"failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
self.actor_ctx.id,
self.stream_source_core.source_id
))
)
})?;

let source_desc_builder: SourceDescBuilder =
Expand Down
8 changes: 4 additions & 4 deletions src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,11 @@ impl<S: StateStore> SourceExecutor<S> {
.instrument_await("source_recv_first_barrier")
.await
.ok_or_else(|| {
StreamExecutorError::from(anyhow!(
anyhow!(
"failed to receive the first barrier, actor_id: {:?}, source_id: {:?}",
self.actor_ctx.id,
self.stream_source_core.as_ref().unwrap().source_id
))
)
})?;

let mut core = self.stream_source_core.unwrap();
Expand Down Expand Up @@ -580,10 +580,10 @@ impl<S: StateStore> SourceExecutor<S> {
.instrument_await("source_recv_first_barrier")
.await
.ok_or_else(|| {
StreamExecutorError::from(anyhow!(
anyhow!(
"failed to receive the first barrier, actor_id: {:?} with no stream source",
self.actor_ctx.id
))
)
})?;
yield Message::Barrier(barrier);

Expand Down
9 changes: 5 additions & 4 deletions src/stream/src/task/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use super::{unique_executor_id, unique_operator_id, CollectResult};
use crate::error::{StreamError, StreamResult};
use crate::error::StreamResult;
use crate::executor::exchange::permit::Receiver;
use crate::executor::monitor::StreamingMetrics;
use crate::executor::subtask::SubtaskHandle;
Expand Down Expand Up @@ -605,9 +605,10 @@ impl LocalStreamManagerCore {
env: StreamEnvironment,
) -> StreamResult<()> {
for &actor_id in actors {
let actor = self.actors.remove(&actor_id).ok_or_else(|| {
StreamError::from(anyhow!("No such actor with actor id:{}", actor_id))
})?;
let actor = self
.actors
.remove(&actor_id)
.ok_or_else(|| anyhow!("No such actor with actor id:{}", actor_id))?;
let mview_definition = &actor.mview_definition;
let actor_context = ActorContext::create_with_metrics(
actor_id,
Expand Down

0 comments on commit 2342e8b

Please sign in to comment.