Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove re-export from logging #3865

Merged
merged 4 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/catalog/src/information_schema/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use common_meta::peer::Peer;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use common_telemetry::logging::warn;
use common_telemetry::warn;
use common_time::timestamp::Timestamp;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_error::ext::ErrorExt;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use common_telemetry::debug;
use either::Either;
use meta_client::client::MetaClientBuilder;
use query::datafusion::DatafusionQueryEngine;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Repl {

let history_file = history_file();
if let Err(e) = rl.load_history(&history_file) {
logging::debug!(
debug!(
"failed to load history file on {}, error: {e}",
history_file.display()
);
Expand Down Expand Up @@ -225,7 +225,7 @@ impl Drop for Repl {
if self.rl.helper().is_some() {
let history_file = history_file();
if let Err(e) = self.rl.save_history(&history_file) {
logging::debug!(
debug!(
"failed to save history file on {}, error: {e}",
history_file.display()
);
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_telemetry::{info, logging};
use common_telemetry::info;
use common_wal::config::DatanodeWalConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
Expand Down Expand Up @@ -210,8 +210,8 @@ impl StartCommand {
.await
.context(StartDatanodeSnafu)?;

logging::info!("Datanode start command: {:#?}", self);
logging::info!("Datanode options: {:#?}", opts);
info!("Datanode start command: {:#?}", self);
info!("Datanode options: {:#?}", opts);

let node_id = opts
.node_id
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use client::client_manager::DatanodeClients;
use common_meta::cache_invalidator::MultiCacheInvalidator;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::logging;
use common_telemetry::info;
use common_time::timezone::set_default_timezone;
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
Expand Down Expand Up @@ -219,8 +219,8 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;

logging::info!("Frontend start command: {:#?}", self);
logging::info!("Frontend options: {:#?}", opts);
info!("Frontend start command: {:#?}", self);
info!("Frontend options: {:#?}", opts);

set_default_timezone(opts.default_timezone.as_deref()).context(InitTimezoneSnafu)?;

Expand Down
6 changes: 3 additions & 3 deletions src/cmd/src/metasrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use async_trait::async_trait;
use clap::Parser;
use common_telemetry::logging;
use common_telemetry::info;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::metasrv::MetasrvOptions;
use snafu::ResultExt;
Expand Down Expand Up @@ -198,8 +198,8 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;

logging::info!("Metasrv start command: {:#?}", self);
logging::info!("Metasrv options: {:#?}", opts);
info!("Metasrv start command: {:#?}", self);
info!("Metasrv options: {:#?}", opts);

let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
.await
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/table/migrate_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::logging::error;
use common_telemetry::error;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
Expand Down
24 changes: 11 additions & 13 deletions src/common/procedure/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{info, logging, tracing};
use common_telemetry::{error, info, tracing};
use snafu::{ensure, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
Expand Down Expand Up @@ -244,20 +244,18 @@ impl ManagerContext {
) -> Option<LoadedProcedure> {
let loaders = self.loaders.lock().unwrap();
let loader = loaders.get(&message.type_name).or_else(|| {
logging::error!(
error!(
"Loader not found, procedure_id: {}, type_name: {}",
procedure_id,
message.type_name
procedure_id, message.type_name
);
None
})?;

let procedure = loader(&message.data)
.map_err(|e| {
logging::error!(
error!(
"Failed to load procedure data, key: {}, source: {:?}",
procedure_id,
e
procedure_id, e
);
e
})
Expand Down Expand Up @@ -496,7 +494,7 @@ impl LocalManager {
continue;
};

logging::info!(
info!(
"Recover root procedure {}-{}, step: {}",
loaded_procedure.procedure.type_name(),
procedure_id,
Expand All @@ -521,15 +519,15 @@ impl LocalManager {
loaded_procedure.step,
loaded_procedure.procedure,
) {
logging::error!(e; "Failed to recover procedure {}", procedure_id);
error!(e; "Failed to recover procedure {}", procedure_id);
}
}
}
}

/// Recovers unfinished procedures and reruns them.
async fn recover(&self) -> Result<()> {
logging::info!("LocalManager start to recover");
info!("LocalManager start to recover");
let recover_start = Instant::now();

let (messages, rollback_messages, finished_ids) =
Expand All @@ -539,19 +537,19 @@ impl LocalManager {
self.submit_recovered_messages(messages, InitProcedureState::Running);

if !finished_ids.is_empty() {
logging::info!(
info!(
"LocalManager try to clean finished procedures, num: {}",
finished_ids.len()
);

for procedure_id in finished_ids {
if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await {
logging::error!(e; "Failed to delete procedure {}", procedure_id);
error!(e; "Failed to delete procedure {}", procedure_id);
}
}
}

logging::info!(
info!(
"LocalManager finish recovery, cost: {}ms",
recover_start.elapsed().as_millis()
);
Expand Down
30 changes: 15 additions & 15 deletions src/common/procedure/src/local/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;

use backon::{BackoffBuilder, ExponentialBuilder};
use common_telemetry::logging;
use common_telemetry::{debug, error, info};
use tokio::time;

use super::rwlock::OwnedKeyRwLockGuard;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl ProcedureGuard {
impl Drop for ProcedureGuard {
fn drop(&mut self) {
if !self.finish {
logging::error!("Procedure {} exits unexpectedly", self.meta.id);
error!("Procedure {} exits unexpectedly", self.meta.id);

// Set state to failed. This is useful in test as runtime may not abort when the runner task panics.
// See https://github.com/tokio-rs/tokio/issues/2002 .
Expand Down Expand Up @@ -104,7 +104,7 @@ impl Runner {
// Ensure we can update the procedure state.
let mut guard = ProcedureGuard::new(self.meta.clone(), self.manager_ctx.clone());

logging::info!(
info!(
"Runner {}-{} starts",
self.procedure.type_name(),
self.meta.id
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Runner {

for id in procedure_ids {
if let Err(e) = self.store.delete_procedure(id).await {
logging::error!(
error!(
e;
"Runner {}-{} failed to delete procedure {}",
self.procedure.type_name(),
Expand All @@ -160,7 +160,7 @@ impl Runner {
}
}

logging::info!(
info!(
"Runner {}-{} exits",
self.procedure.type_name(),
self.meta.id
Expand Down Expand Up @@ -260,7 +260,7 @@ impl Runner {
ProcedureState::Running | ProcedureState::Retrying { .. } => {
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
debug!(
"Execute procedure {}-{} once, status: {:?}, need_persist: {}",
self.procedure.type_name(),
self.meta.id,
Expand Down Expand Up @@ -299,7 +299,7 @@ impl Runner {
}
}
Err(e) => {
logging::error!(
error!(
e;
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
Expand Down Expand Up @@ -394,7 +394,7 @@ impl Runner {

/// Extend the retry time to wait for the next retry.
async fn wait_on_err(&mut self, d: Duration, i: u64) {
logging::info!(
info!(
"Procedure {}-{} retry for the {} times after {} millis",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -407,7 +407,7 @@ impl Runner {
async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
let has_child = !subprocedures.is_empty();
for subprocedure in subprocedures {
logging::info!(
info!(
"Procedure {}-{} submit subprocedure {}-{}",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -422,7 +422,7 @@ impl Runner {
);
}

logging::info!(
info!(
"Procedure {}-{} is waiting for subprocedures",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -432,7 +432,7 @@ impl Runner {
if has_child {
self.meta.child_notify.notified().await;

logging::info!(
info!(
"Procedure {}-{} is waked up",
self.procedure.type_name(),
self.meta.id,
Expand All @@ -454,7 +454,7 @@ impl Runner {
)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to persist procedure {}-{}",
self.procedure.type_name(),
self.meta.id
Expand All @@ -470,7 +470,7 @@ impl Runner {
.commit_procedure(self.meta.id, self.step)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to commit procedure {}-{}",
self.procedure.type_name(),
self.meta.id
Expand All @@ -496,7 +496,7 @@ impl Runner {
.rollback_procedure(self.meta.id, message)
.await
.map_err(|e| {
logging::error!(
error!(
e; "Failed to write rollback key for procedure {}-{}",
self.procedure.type_name(),
self.meta.id
Expand All @@ -509,7 +509,7 @@ impl Runner {

fn done(&self, output: Option<Output>) {
// TODO(yingwen): Add files to remove list.
logging::info!(
info!(
"Procedure {}-{} done",
self.procedure.type_name(),
self.meta.id,
Expand Down
16 changes: 7 additions & 9 deletions src/common/procedure/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::fmt;

use common_telemetry::logging;
use common_telemetry::{debug, error, info, warn};
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl ProcedureStore {
/// Creates a new [ProcedureStore] from specific [StateStoreRef].
pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore {
let proc_path = format!("{}{PROC_PATH}", parent_path);
logging::info!("The procedure state store path is: {}", &proc_path);
info!("The procedure state store path is: {}", &proc_path);
ProcedureStore { proc_path, store }
}

Expand Down Expand Up @@ -154,7 +154,7 @@ impl ProcedureStore {
while let Some((key_set, _)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while deleting procedures, key: {}", key);
warn!("Unknown key while deleting procedures, key: {}", key);
continue;
};
if curr_key.key_type == KeyType::Step {
Expand All @@ -165,11 +165,9 @@ impl ProcedureStore {
}
}

logging::debug!(
debug!(
"Delete keys for procedure {}, step_keys: {:?}, finish_keys: {:?}",
procedure_id,
step_keys,
finish_keys
procedure_id, step_keys, finish_keys
);
// We delete all step keys first.
self.store.batch_delete(step_keys.as_slice()).await?;
Expand Down Expand Up @@ -203,7 +201,7 @@ impl ProcedureStore {
while let Some((key_set, value)) = key_values.try_next().await? {
let key = key_set.key();
let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
warn!("Unknown key while loading procedures, key: {}", key);
continue;
};

Expand Down Expand Up @@ -251,7 +249,7 @@ impl ProcedureStore {
serde_json::from_slice(value)
.map_err(|e| {
// `e` doesn't impl ErrorExt so we print it as normal error.
logging::error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
error!("Failed to parse value, key: {:?}, source: {:?}", key, e);
e
})
.ok()
Expand Down
Loading
Loading