diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 042fdf926478..de6ad67b3208 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -14,14 +14,8 @@ //! Write ahead log of the engine. -/// TODO(weny): remove it -#[allow(unused)] pub(crate) mod entry_distributor; -/// TODO(weny): remove it -#[allow(unused)] pub(crate) mod entry_reader; -/// TODO(weny): remove it -#[allow(unused)] pub(crate) mod raw_entry_reader; use std::collections::HashMap; diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs index bad80fa9d312..e869e5ee1a5c 100644 --- a/src/mito2/src/wal/entry_distributor.rs +++ b/src/mito2/src/wal/entry_distributor.rs @@ -12,19 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::min; use std::collections::HashMap; use std::sync::Arc; -use api::v1::WalEntry; use async_stream::stream; use common_telemetry::{debug, error}; use futures::future::join_all; -use snafu::{ensure, OptionExt}; +use snafu::OptionExt; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; use store_api::storage::RegionId; -use tokio::sync::mpsc::{self, Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::sync::oneshot; use tokio_stream::StreamExt; @@ -99,7 +97,6 @@ impl WalEntryDistributor { /// Receives the Wal entries from [WalEntryDistributor]. #[derive(Debug)] pub(crate) struct WalEntryReceiver { - region_id: RegionId, /// Receives the [Entry] from the [WalEntryDistributor]. entry_receiver: Option>, /// Sends the `start_id` to the [WalEntryDistributor]. @@ -107,13 +104,8 @@ pub(crate) struct WalEntryReceiver { } impl WalEntryReceiver { - pub fn new( - region_id: RegionId, - entry_receiver: Receiver, - arg_sender: oneshot::Sender, - ) -> Self { + pub fn new(entry_receiver: Receiver, arg_sender: oneshot::Sender) -> Self { Self { - region_id, entry_receiver: Some(entry_receiver), arg_sender: Some(arg_sender), } @@ -121,8 +113,8 @@ impl WalEntryReceiver { } impl WalEntryReader for WalEntryReceiver { - fn read(&mut self, provider: &Provider, start_id: EntryId) -> Result> { - let mut arg_sender = + fn read(&mut self, _provider: &Provider, start_id: EntryId) -> Result> { + let arg_sender = self.arg_sender .take() .with_context(|| error::InvalidWalReadRequestSnafu { @@ -205,7 +197,7 @@ pub fn build_wal_entry_distributor_and_receivers( senders.insert(region_id, entry_sender); arg_receivers.push((region_id, arg_receiver)); - readers.push(WalEntryReceiver::new(region_id, entry_receiver, arg_sender)); + readers.push(WalEntryReceiver::new(entry_receiver, arg_sender)); } ( @@ -223,7 +215,7 @@ pub fn build_wal_entry_distributor_and_receivers( mod tests { use std::assert_matches::assert_matches; - use api::v1::{Mutation, OpType}; + use api::v1::{Mutation, OpType, WalEntry}; use futures::{stream, TryStreamExt}; use prost::Message; use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader, NaiveEntry}; @@ -244,7 +236,7 @@ mod tests { } impl RawEntryReader for MockRawEntryReader { - fn read(&self, provider: &Provider, _start_id: EntryId) -> Result> { + fn read(&self, _provider: &Provider, _start_id: EntryId) -> Result> { let stream = stream::iter(self.entries.clone().into_iter().map(Ok)); Ok(Box::pin(stream)) } diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs index 5db4eb9efe5f..59a8fd8d46b3 100644 --- a/src/mito2/src/wal/entry_reader.rs +++ b/src/mito2/src/wal/entry_reader.rs @@ -14,13 +14,11 @@ use api::v1::WalEntry; use async_stream::stream; -use common_telemetry::info; use futures::StreamExt; use prost::Message; use snafu::{ensure, ResultExt}; use store_api::logstore::entry::Entry; use store_api::logstore::provider::Provider; -use store_api::storage::RegionId; use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result}; use crate::wal::raw_entry_reader::RawEntryReader; @@ -90,17 +88,15 @@ mod tests { use std::assert_matches::assert_matches; use api::v1::{Mutation, OpType, WalEntry}; - use futures::{stream, TryStreamExt}; + use futures::TryStreamExt; use prost::Message; use store_api::logstore::entry::{Entry, MultiplePartEntry, MultiplePartHeader}; use store_api::logstore::provider::Provider; use store_api::storage::RegionId; - use crate::error::{self, Result}; + use crate::error; use crate::test_util::wal_util::MockRawEntryStream; use crate::wal::entry_reader::{LogStoreEntryReader, WalEntryReader}; - use crate::wal::raw_entry_reader::{EntryStream, RawEntryReader}; - use crate::wal::EntryId; #[tokio::test] async fn test_tail_corrupted_stream() { diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs index d8afc7915119..6dd11c2c8f64 100644 --- a/src/mito2/src/wal/raw_entry_reader.rs +++ b/src/mito2/src/wal/raw_entry_reader.rs @@ -16,12 +16,10 @@ use std::sync::Arc; use async_stream::try_stream; use common_error::ext::BoxedError; -use common_wal::options::{KafkaWalOptions, WalOptions}; use futures::stream::BoxStream; -use futures::TryStreamExt; use snafu::ResultExt; use store_api::logstore::entry::Entry; -use store_api::logstore::provider::{KafkaProvider, Provider, RaftEngineProvider}; +use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::storage::RegionId; use tokio_stream::StreamExt; @@ -119,12 +117,9 @@ where mod tests { use std::sync::Arc; - use common_wal::options::WalOptions; - use futures::stream; + use futures::{stream, TryStreamExt}; use store_api::logstore::entry::{Entry, NaiveEntry}; - use store_api::logstore::{ - AppendBatchResponse, AppendResponse, EntryId, LogStore, SendableEntryStream, - }; + use store_api::logstore::{AppendBatchResponse, EntryId, LogStore, SendableEntryStream}; use store_api::storage::RegionId; use super::*; @@ -145,24 +140,24 @@ mod tests { async fn append_batch( &self, - entries: Vec, + _entries: Vec, ) -> Result { unreachable!() } async fn read( &self, - provider: &Provider, - id: EntryId, + _provider: &Provider, + _id: EntryId, ) -> Result, Self::Error> { Ok(Box::pin(stream::iter(vec![Ok(self.entries.clone())]))) } - async fn create_namespace(&self, ns: &Provider) -> Result<(), Self::Error> { + async fn create_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> { unreachable!() } - async fn delete_namespace(&self, ns: &Provider) -> Result<(), Self::Error> { + async fn delete_namespace(&self, _ns: &Provider) -> Result<(), Self::Error> { unreachable!() } @@ -172,18 +167,18 @@ mod tests { async fn obsolete( &self, - provider: &Provider, - entry_id: EntryId, + _provider: &Provider, + _entry_id: EntryId, ) -> Result<(), Self::Error> { unreachable!() } fn entry( &self, - data: &mut Vec, - entry_id: EntryId, - region_id: RegionId, - provider: &Provider, + _data: &mut Vec, + _entry_id: EntryId, + _region_id: RegionId, + _provider: &Provider, ) -> Result { unreachable!() }