diff --git a/src/wal/src/local_storage_impl/config.rs b/src/wal/src/local_storage_impl/config.rs index 59a0c81b67..8d018896e1 100644 --- a/src/wal/src/local_storage_impl/config.rs +++ b/src/wal/src/local_storage_impl/config.rs @@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LocalStorageConfig { pub path: String, - pub max_segment_size: u64, + pub max_segment_size: usize, pub cache_size: usize, } diff --git a/src/wal/src/local_storage_impl/record_encoding.rs b/src/wal/src/local_storage_impl/record_encoding.rs index f7c55b292f..6e8011c2e1 100644 --- a/src/wal/src/local_storage_impl/record_encoding.rs +++ b/src/wal/src/local_storage_impl/record_encoding.rs @@ -62,6 +62,7 @@ define_result!(Error); /// | (u8) | (u32) | (u32) | (u64) | (u64) | (u32) | | /// +---------+--------+--------+------------+--------------+--------------+-------+ /// ``` +#[derive(Debug)] pub struct Record<'a> { /// The version number of the record. pub version: u8, @@ -183,6 +184,13 @@ impl RecordEncoding { // Read length let length = buf.try_get_u32().context(Decoding)?; + ensure!( + length > 0, + LengthMismatch { + expected: 1usize, + actual: 0usize + } + ); // Ensure the buf is long enough ensure!( diff --git a/src/wal/src/local_storage_impl/segment.rs b/src/wal/src/local_storage_impl/segment.rs index f44ea131ba..80b3718228 100644 --- a/src/wal/src/local_storage_impl/segment.rs +++ b/src/wal/src/local_storage_impl/segment.rs @@ -18,10 +18,8 @@ use std::{ collections::{HashMap, VecDeque}, fmt::Debug, - fs, - fs::{File, OpenOptions}, - io, - io::Write, + fs::{self, File, OpenOptions}, + io::{self, Write}, path::Path, sync::{ atomic::{AtomicU64, Ordering}, @@ -107,6 +105,16 @@ pub enum Error { #[snafu(display("Checksum mismatch: expected {}, but got {}", expected, actual))] ChecksumMismatch { expected: u32, actual: u32 }, + + #[snafu(display( + "Failed to write log entries, err:{}.\nBacktrace:\n{}", + source, + backtrace + ))] + WriteExec { + source: GenericError, + backtrace: Backtrace, + }, } define_result!(Error); @@ -115,9 +123,7 @@ const SEGMENT_HEADER: &[u8] = b"HoraeDBWAL"; const WAL_SEGMENT_V0: u8 = 0; const NEWEST_WAL_SEGMENT_VERSION: u8 = WAL_SEGMENT_V0; const VERSION_SIZE: usize = 1; - -// todo: make MAX_FILE_SIZE configurable -const MAX_FILE_SIZE: u64 = 64 * 1024 * 1024; +const FLUSH_INTERVAL: usize = 10; /// Segment file format: /// @@ -140,8 +146,11 @@ pub struct Segment { /// A unique identifier for the segment. id: u64, - /// The size of the segment in bytes. - size: u64, + /// The current size in use. + current_size: usize, + + /// The size of the segment file. + segment_size: usize, /// The minimum sequence number of records within this segment. min_seq: SequenceNumber, @@ -152,59 +161,75 @@ pub struct Segment { /// The encoding format used for records within this segment. record_encoding: RecordEncoding, - /// An optional file handle for the segment. - /// This may be `None` if the file is not currently open. - file: Option, - /// An optional memory-mapped mutable buffer of the segment. /// This may be `None` if the segment is not memory-mapped. mmap: Option, /// An optional vector of positions within the segment. - /// This may be `None` if the segment is not memory-mapped. - record_position: Option>, + record_position: Vec, + + /// A counter to keep track of write operations before flushing. + write_count: usize, + + /// The last flushed position in the memory-mapped file. + last_flushed_position: usize, } #[derive(Debug, Clone)] pub struct Position { - start: u64, - end: u64, + start: usize, + end: usize, } impl Segment { - pub fn new(path: String, segment_id: u64) -> Result { - if !Path::new(&path).exists() { - let mut file = File::create(&path).context(FileOpen)?; - file.write_all(&[NEWEST_WAL_SEGMENT_VERSION]) - .context(FileOpen)?; - file.write_all(SEGMENT_HEADER).context(FileOpen)?; - } - Ok(Segment { + pub fn new(path: String, segment_id: u64, segment_size: usize) -> Result { + let mut segment = Segment { version: NEWEST_WAL_SEGMENT_VERSION, - path, + path: path.clone(), id: segment_id, - size: SEGMENT_HEADER.len() as u64, + current_size: SEGMENT_HEADER.len(), + segment_size, min_seq: MAX_SEQUENCE_NUMBER, max_seq: MIN_SEQUENCE_NUMBER, record_encoding: RecordEncoding::newest(), - file: None, mmap: None, - record_position: None, - }) + record_position: Vec::new(), + write_count: 0, + last_flushed_position: SEGMENT_HEADER.len(), + }; + + if !Path::new(&path).exists() { + // If the file does not exist, create a new one + let mut file = File::create(&path).context(FileOpen)?; + file.write_all(&[NEWEST_WAL_SEGMENT_VERSION]) + .context(FileOpen)?; + file.write_all(SEGMENT_HEADER).context(FileOpen)?; + file.set_len(segment_size as u64).context(FileOpen)?; + return Ok(segment); + } + + // Open the segment file to update min and max sequence number and file size + segment.open()?; + + // Close the segment file. If the segment is to be used for read or write, it + // will be opened again + segment.close()?; + + Ok(segment) } pub fn open(&mut self) -> Result<()> { // Open the segment file let file = OpenOptions::new() .read(true) - .append(true) + .write(true) .open(&self.path) .context(FileOpen)?; let metadata = file.metadata().context(FileOpen)?; - let size = metadata.len(); + let file_size = metadata.len(); - // Map the file in memory + // Map the file to memory let mmap = unsafe { MmapOptions::new().map_mut(&file).context(Mmap)? }; // Validate segment version @@ -213,113 +238,148 @@ impl Segment { // Validate segment header let header_len = SEGMENT_HEADER.len(); - ensure!(size >= header_len as u64, InvalidHeader); + ensure!( + file_size >= (VERSION_SIZE + header_len) as u64, + InvalidHeader + ); let header = &mmap[VERSION_SIZE..VERSION_SIZE + header_len]; ensure!(header == SEGMENT_HEADER, InvalidHeader); // Read and validate all records let mut pos = VERSION_SIZE + header_len; let mut record_position = Vec::new(); - while pos < size as usize { - let data = &mmap[pos..]; - let record = self - .record_encoding - .decode(data) - .box_err() - .context(InvalidRecord)?; + // Update min and max sequence number + let mut min_seq = MAX_SEQUENCE_NUMBER; + let mut max_seq = MIN_SEQUENCE_NUMBER; - record_position.push(Position { - start: pos as u64, - end: (pos + record.len()) as u64, - }); + while pos < file_size as usize { + let data = &mmap[pos..]; - // Move to the next record - pos += record.len(); + match self.record_encoding.decode(data).box_err() { + Ok(record) => { + record_position.push(Position { + start: pos, + end: pos + record.len(), + }); + min_seq = min_seq.min(record.sequence_num); + max_seq = max_seq.max(record.sequence_num); + pos += record.len(); + } + Err(_) => { + // If decoding fails, we've reached the end of valid data + // TODO: too tricky, refactor later + break; + } + } } - self.file = Some(file); self.mmap = Some(mmap); - self.record_position = Some(record_position); - self.size = size; + self.record_position = record_position; + self.current_size = pos; + self.write_count = 0; + self.last_flushed_position = pos; + self.min_seq = min_seq; + self.max_seq = max_seq; Ok(()) } pub fn close(&mut self) -> Result<()> { - self.file.take(); + if let Some(ref mut mmap) = self.mmap { + // Flush before closing + mmap.flush_range(self.last_flushed_position, self.current_size) + .context(Flush)?; + // Reset the write count + self.write_count = 0; + // Update the last flushed position + self.last_flushed_position = self.current_size; + } self.mmap.take(); - self.record_position.take(); Ok(()) } + pub fn is_open(&self) -> bool { + self.mmap.is_some() + } + /// Append a slice to the segment file. - pub fn append(&mut self, data: &[u8]) -> Result<()> { - ensure!(self.size + data.len() as u64 <= MAX_FILE_SIZE, SegmentFull); + fn append(&mut self, data: &[u8]) -> Result<()> { + ensure!( + self.current_size + data.len() <= self.segment_size, + SegmentFull + ); // Ensure the segment file is open - let Some(file) = &mut self.file else { + let Some(mmap) = &mut self.mmap else { return SegmentNotOpen { id: self.id }.fail(); }; - // Append to the file - file.write_all(data).context(SegmentAppend)?; - file.flush().context(Flush)?; + // Append to mmap + let start = self.current_size; + let end = start + data.len(); + mmap[start..end].copy_from_slice(data); + + // Increment the write count + self.write_count += 1; + + // Only flush if the write_count reaches FLUSH_INTERVAL + if self.write_count >= FLUSH_INTERVAL { + mmap.flush_range(self.last_flushed_position, self.current_size + data.len()) + .context(Flush)?; + // Reset the write count + self.write_count = 0; + // Update the last flushed position + self.last_flushed_position = self.current_size + data.len(); + } - // Remap - // todo: Do not remap every time you append; instead, create a large enough file - // at the beginning. - let mmap = unsafe { MmapOptions::new().map_mut(&*file).context(Mmap)? }; - self.mmap = Some(mmap); - self.size += data.len() as u64; + // Update the current size + self.current_size += data.len(); Ok(()) } - pub fn read(&self, offset: u64, size: u64) -> Result> { + pub fn read(&self, offset: usize, size: usize) -> Result> { // Ensure that the reading range is within the file ensure!( - offset + size <= self.size, + offset + size <= self.current_size, LengthMismatch { - expected: (offset + size) as usize, - actual: self.size as usize + expected: (offset + size), + actual: self.current_size } ); - let start = offset as usize; - let end = start + size as usize; + let start = offset; + let end = start + size; match &self.mmap { Some(mmap) => Ok(mmap[start..end].to_vec()), None => SegmentNotOpen { id: self.id }.fail(), } } - pub fn append_record_position(&mut self, pos: &mut Vec) -> Result<()> { - match self.record_position.as_mut() { - Some(record_position) => { - record_position.append(pos); - Ok(()) - } - None => SegmentNotOpen { id: self.id }.fail(), - } - } + pub fn append_records( + &mut self, + data: &[u8], + positions: &mut Vec, + prev_sequence_num: SequenceNumber, + next_sequence_num: SequenceNumber, + ) -> Result<()> { + // Append logs to segment file + self.append(data)?; + + // Update record position + self.record_position.append(positions); + + // Update min and max sequence number + self.min_seq = self.min_seq.min(prev_sequence_num); + self.max_seq = self.max_seq.max(next_sequence_num - 1); - pub fn update_seq(&mut self, min_seq: u64, max_seq: u64) -> Result<()> { - if min_seq < self.min_seq { - self.min_seq = min_seq; - } - if max_seq > self.max_seq { - self.max_seq = max_seq; - } Ok(()) } } -pub struct Region { - /// Identifier for regions. - _region_id: u64, - +#[derive(Debug)] +pub struct SegmentManager { /// All segments protected by a mutex - /// todo: maybe use a RWLock? all_segments: Mutex>>>, /// Cache for opened segments @@ -327,12 +387,83 @@ pub struct Region { /// Maximum size of the cache cache_size: usize, +} + +impl SegmentManager { + fn add_segment(&self, id: u64, segment: Arc>) -> Result<()> { + let mut all_segments = self.all_segments.lock().unwrap(); + all_segments.insert(id, segment); + Ok(()) + } + + /// Obtain the target segment + fn get_segment(&self, segment_id: u64) -> Result>> { + let all_segments = self.all_segments.lock().unwrap(); - /// Directory for segment storage - _segment_dir: String, + let segment = all_segments.get(&segment_id); + + let segment = match segment { + Some(segment) => segment, + None => return SegmentNotFound { id: segment_id }.fail(), + }; - /// Index of the latest segment for appending logs - current: Mutex, + Ok(segment.clone()) + } + + /// Open segment if it is not in cache, need to acquire the lock outside + fn open_segment(&self, segment: &mut Segment) -> Result<()> { + let mut cache = self.cache.lock().unwrap(); + + // Check if segment is already in cache + if cache.iter().any(|id| *id == segment.id) { + return Ok(()); + } + + // If not in cache, load from disk + segment.open()?; + + // Add to cache + if cache.len() == self.cache_size { + let evicted_segment_id = cache.pop_front(); + if let Some(evicted_segment_id) = evicted_segment_id { + // The evicted segment should be closed first + let evicted_segment = self.get_segment(evicted_segment_id)?; + let mut evicted_segment = evicted_segment.lock().unwrap(); + evicted_segment.close()?; + } + } + cache.push_back(segment.id); + + Ok(()) + } + + pub fn mark_delete_entries_up_to( + &self, + _location: WalLocation, + _sequence_num: SequenceNumber, + ) -> Result<()> { + todo!() + } + + pub fn close_all(&self) -> Result<()> { + let mut cache = self.cache.lock().unwrap(); + cache.clear(); + let all_segments = self.all_segments.lock().unwrap(); + for segment in all_segments.values() { + segment.lock().unwrap().close()?; + } + Ok(()) + } +} + +#[derive(Debug)] +pub struct Region { + /// Identifier for regions. + _region_id: u64, + + /// Contains all segments and manages the opening and closing of each + /// segment + segment_manager: Arc, /// Encoding method for logs log_encoding: CommonLogEncoding, @@ -340,27 +471,38 @@ pub struct Region { /// Encoding method for records record_encoding: RecordEncoding, + /// Runtime for handling write requests + runtime: Arc, + + /// All segments are fixed size + segment_size: usize, + + /// Directory of segment files + region_dir: String, + /// Sequence number for the next log next_sequence_num: AtomicU64, - /// Runtime for handling write requests - runtime: Arc, + /// The latest segment for appending logs + current_segment: Mutex>>, } impl Region { pub fn new( region_id: u64, cache_size: usize, - segment_dir: String, + segment_size: usize, + region_dir: String, runtime: Arc, ) -> Result { let mut all_segments = HashMap::new(); // Scan the directory for existing WAL files let mut max_segment_id: i32 = -1; + let mut next_sequence_num: u64 = MIN_SEQUENCE_NUMBER + 1; // Segment file naming convention: segment_.wal - for entry in fs::read_dir(&segment_dir).context(FileOpen)? { + for entry in fs::read_dir(®ion_dir).context(FileOpen)? { let entry = entry.context(FileOpen)?; let path = entry.path(); @@ -389,7 +531,9 @@ impl Region { None => continue, }; - let segment = Segment::new(path.to_string_lossy().to_string(), segment_id)?; + let segment = + Segment::new(path.to_string_lossy().to_string(), segment_id, segment_size)?; + next_sequence_num = next_sequence_num.max(segment.max_seq + 1); let segment = Arc::new(Mutex::new(segment)); if segment_id as i32 > max_segment_id { @@ -401,78 +545,40 @@ impl Region { // If no existing segments, create a new one if max_segment_id == -1 { max_segment_id = 0; - let path = format!("{}/segment_{}.wal", segment_dir, max_segment_id); - let new_segment = Segment::new(path, max_segment_id as u64)?; + let path = format!("{}/segment_{}.wal", region_dir, max_segment_id); + let new_segment = Segment::new(path, max_segment_id as u64, segment_size)?; let new_segment = Arc::new(Mutex::new(new_segment)); all_segments.insert(0, new_segment); } - Ok(Self { - _region_id: region_id, + let latest_segment = all_segments.get(&(max_segment_id as u64)).unwrap().clone(); + + let segment_manager = SegmentManager { all_segments: Mutex::new(all_segments), cache: Mutex::new(VecDeque::new()), cache_size, - _segment_dir: segment_dir, - current: Mutex::new(max_segment_id as u64), + }; + + Ok(Self { + _region_id: region_id, + segment_manager: Arc::new(segment_manager), log_encoding: CommonLogEncoding::newest(), record_encoding: RecordEncoding::newest(), - // todo: do not use MIN_SEQUENCE_NUMBER, read from the latest record - next_sequence_num: AtomicU64::new(MIN_SEQUENCE_NUMBER + 1), + segment_size, + region_dir, + next_sequence_num: AtomicU64::new(next_sequence_num), runtime, + current_segment: Mutex::new(latest_segment), }) } - /// Obtain the target segment. If it is not open, then open it and put it to - /// the cache. - fn get_segment(&self, segment_id: u64) -> Result>> { - let mut cache = self.cache.lock().unwrap(); - let all_segments = self.all_segments.lock().unwrap(); - - let segment = all_segments.get(&segment_id); - - let segment = match segment { - Some(segment) => segment, - None => return SegmentNotFound { id: segment_id }.fail(), - }; - - // Check if segment is already in cache - if cache.iter().any(|id| *id == segment_id) { - let segment = all_segments.get(&segment_id); - return match segment { - Some(segment) => Ok(segment.clone()), - None => SegmentNotFound { id: segment_id }.fail(), - }; - } - - // If not in cache, load from disk - segment.lock().unwrap().open()?; - - // Add to cache - if cache.len() == self.cache_size { - let evicted_segment_id = cache.pop_front(); - // TODO: if the evicted segment is being read or written, wait for it to finish - if let Some(evicted_segment_id) = evicted_segment_id { - let evicted_segment = all_segments.get(&evicted_segment_id); - if let Some(evicted_segment) = evicted_segment { - evicted_segment.lock().unwrap().close()?; - } else { - return SegmentNotFound { - id: evicted_segment_id, - } - .fail(); - } - } - } - cache.push_back(segment_id); - - Ok(segment.clone()) - } - pub fn write(&self, _ctx: &WriteContext, batch: &LogWriteBatch) -> Result { - // Lock - let current = self.current.lock().unwrap(); - let segment = self.get_segment(*current)?; - let mut segment = segment.lock().unwrap(); + // In the WAL based on local storage, we need to ensure the sequence number in + // segment is monotonically increasing. So we need to acquire a lock here. + // Perhaps we could avoid acquiring the lock here and instead allocate the + // position that needs to be written in the segment, then fill it within + // spawn_blocking. However, I’m not sure about the correctness of this approach. + let mut current_segment = self.current_segment.lock().unwrap(); let entries_num = batch.len() as u64; let table_id = batch.location.table_id; @@ -495,29 +601,54 @@ impl Region { .context(Encoding)?; record_position.push(Position { - start: (data.len() - record.len()) as u64, - end: data.len() as u64, + start: data.len() - record.len(), + end: data.len(), }); next_sequence_num += 1; } - // TODO: spawn a new task to write to segment - // TODO: maybe need a write mutex? + let guard = current_segment.lock().unwrap(); - for pos in record_position.iter_mut() { - pos.start += segment.size; - pos.end += segment.size; + // Check if the current segment has enough space for the new data + // If not, create a new segment and update current_segment + if guard.current_size + data.len() > guard.segment_size { + let new_segment_id = guard.id + 1; + // We need to drop guard to allow the update of current_segment + drop(guard); + + // Create a new segment + let new_segment = Segment::new( + format!("{}/segment_{}.wal", self.region_dir, new_segment_id), + new_segment_id, + self.segment_size, + )?; + let new_segment = Arc::new(Mutex::new(new_segment)); + self.segment_manager + .add_segment(new_segment_id, new_segment.clone())?; + + // Update current segment + *current_segment = new_segment; + } else { + drop(guard); } - // Update the record position - segment.append_record_position(&mut record_position)?; + let mut guard = current_segment.lock().unwrap(); - // Update the min and max sequence numbers - segment.update_seq(prev_sequence_num, next_sequence_num - 1)?; + // Open the segment if not opened + self.segment_manager.open_segment(&mut guard)?; + for pos in record_position.iter_mut() { + pos.start += guard.current_size; + pos.end += guard.current_size; + } // Append logs to segment file - segment.append(&data)?; + guard.append_records( + &data, + &mut record_position, + prev_sequence_num, + next_sequence_num - 1, + )?; Ok(next_sequence_num - 1) } @@ -536,14 +667,16 @@ impl Region { if start > end { return Ok(BatchLogIteratorAdapter::empty()); } - let iter = SegmentLogIterator::new( + + let iter = MultiSegmentLogIterator::new( + self.segment_manager.clone(), self.log_encoding.clone(), self.record_encoding.clone(), - self.get_segment(0)?, Some(req.location.table_id), start, end, - ); + )?; + Ok(BatchLogIteratorAdapter::new_with_sync( Box::new(iter), self.runtime.clone(), @@ -552,14 +685,14 @@ impl Region { } pub fn scan(&self, ctx: &ScanContext, _req: &ScanRequest) -> Result { - let iter = SegmentLogIterator::new( + let iter = MultiSegmentLogIterator::new( + self.segment_manager.clone(), self.log_encoding.clone(), self.record_encoding.clone(), - self.get_segment(0)?, None, MIN_SEQUENCE_NUMBER, MAX_SEQUENCE_NUMBER, - ); + )?; Ok(BatchLogIteratorAdapter::new_with_sync( Box::new(iter), self.runtime.clone(), @@ -569,10 +702,15 @@ impl Region { pub fn mark_delete_entries_up_to( &self, - _location: WalLocation, - _sequence_num: SequenceNumber, + location: WalLocation, + sequence_num: SequenceNumber, ) -> Result<()> { - todo!() + self.segment_manager + .mark_delete_entries_up_to(location, sequence_num) + } + + pub fn close(&self) -> Result<()> { + self.segment_manager.close_all() } #[inline] @@ -591,13 +729,19 @@ pub struct RegionManager { root_dir: String, regions: Mutex>>, cache_size: usize, + segment_size: usize, runtime: Arc, } impl RegionManager { // Create a RegionManager, and scans all the region folders located under // root_dir. - pub fn new(root_dir: String, cache_size: usize, runtime: Arc) -> Result { + pub fn new( + root_dir: String, + cache_size: usize, + segment_size: usize, + runtime: Arc, + ) -> Result { let mut regions = HashMap::new(); // Naming conversion: / @@ -623,6 +767,7 @@ impl RegionManager { let region = Region::new( region_id, cache_size, + segment_size, path.to_string_lossy().to_string(), runtime.clone(), )?; @@ -633,6 +778,7 @@ impl RegionManager { root_dir, regions: Mutex::new(regions), cache_size, + segment_size, runtime, }) } @@ -651,6 +797,7 @@ impl RegionManager { let region = Region::new( region_id, self.cache_size, + self.segment_size, region_dir.to_string_lossy().to_string(), self.runtime.clone(), )?; @@ -682,23 +829,37 @@ impl RegionManager { region.mark_delete_entries_up_to(location, sequence_num) } + pub fn close(&self, region_id: RegionId) -> Result<()> { + let region = self.get_region(region_id)?; + region.close() + } + + pub fn close_all(&self) -> Result<()> { + for region in self.regions.lock().unwrap().values() { + region.close()?; + } + Ok(()) + } + pub fn sequence_num(&self, location: WalLocation) -> Result { let region = self.get_region(location.region_id)?; region.sequence_num(location) } } -// TODO: handle the case when read requests involving multiple segments #[derive(Debug)] -pub struct SegmentLogIterator { +struct SegmentLogIterator { /// Encoding method for common log. log_encoding: CommonLogEncoding, /// Encoding method for records. record_encoding: RecordEncoding, - /// Thread-safe, shared reference to the log segment. - segment: Arc>, + /// Raw content of the segment. + segment_content: Vec, + + /// Positions of records within the segment content. + record_positions: Vec, /// Optional identifier for the table, which is used to filter logs. table_id: Option, @@ -712,9 +873,6 @@ pub struct SegmentLogIterator { /// Index of the current record within the segment. current_record_idx: usize, - /// The raw payload data of the current record. - current_payload: Vec, - /// Flag indicating whether there is no more data to read. no_more_data: bool, } @@ -724,49 +882,57 @@ impl SegmentLogIterator { log_encoding: CommonLogEncoding, record_encoding: RecordEncoding, segment: Arc>, + segment_manager: Arc, table_id: Option, start: SequenceNumber, end: SequenceNumber, - ) -> Self { - SegmentLogIterator { + ) -> Result { + // Open the segment if it is not open + let mut segment = segment.lock().unwrap(); + if !segment.is_open() { + segment_manager.open_segment(&mut segment)?; + } + + // Read the entire content of the segment + let segment_content = segment.read(0, segment.current_size)?; + + // Get record positions + let record_positions = segment.record_position.clone(); + + Ok(Self { log_encoding, record_encoding, - segment, + segment_content, + record_positions, table_id, start, end, current_record_idx: 0, - current_payload: Vec::new(), no_more_data: false, - } + }) } - fn next(&mut self) -> Result>> { + fn next(&mut self) -> Result>>> { if self.no_more_data { return Ok(None); } - // todo: ensure that this segment is not evicted from the cache during the read - // process, or that it is reloaded into the cache as needed - let segment = self.segment.lock().unwrap(); - loop { - let Some(record_position) = &segment.record_position else { - self.no_more_data = true; - return Ok(None); - }; - let Some(pos) = record_position.get(self.current_record_idx) else { + // Get the next record position + let Some(pos) = self.record_positions.get(self.current_record_idx) else { self.no_more_data = true; return Ok(None); }; self.current_record_idx += 1; - let record = segment.read(pos.start, pos.end - pos.start)?; - // Decode record + // Extract the record data from the segment content + let record_data = &self.segment_content[pos.start..pos.end]; + + // Decode the record let record = self .record_encoding - .decode(record.as_slice()) + .decode(record_data) .box_err() .context(InvalidRecord)?; @@ -786,25 +952,145 @@ impl SegmentLogIterator { } } - // Decode value + // Decode the value let value = self .log_encoding .decode_value(record.value) .box_err() .context(InvalidRecord)?; - self.current_payload = value.to_owned(); - return Ok(Some(LogEntry { table_id: record.table_id, sequence: record.sequence_num, - payload: self.current_payload.as_slice(), + payload: value.to_owned(), })); } } } -impl SyncLogIterator for SegmentLogIterator { +#[derive(Debug)] +pub struct MultiSegmentLogIterator { + /// Segment manager that contains all segments involved in this read + /// operation. + segment_manager: Arc, + + /// All segments involved in this read operation. + segments: Vec, + + /// Current segment index. + current_segment_idx: usize, + + /// Current segment iterator. + current_iterator: Option, + + /// Encoding method for common log. + log_encoding: CommonLogEncoding, + + /// Encoding method for records. + record_encoding: RecordEncoding, + + /// Optional identifier for the table, which is used to filter logs. + table_id: Option, + + /// Starting sequence number for log iteration. + start: SequenceNumber, + + /// Ending sequence number for log iteration. + end: SequenceNumber, + + /// The raw payload data of the current record. + current_payload: Vec, +} + +impl MultiSegmentLogIterator { + pub fn new( + segment_manager: Arc, + log_encoding: CommonLogEncoding, + record_encoding: RecordEncoding, + table_id: Option, + start: SequenceNumber, + end: SequenceNumber, + ) -> Result { + // Find all segments that contain the requested sequence numbers + let mut relevant_segments = Vec::new(); + + { + let all_segments = segment_manager.all_segments.lock().unwrap(); + + for (_, segment) in all_segments.iter() { + let segment = segment.lock().unwrap(); + if segment.min_seq <= end && segment.max_seq >= start { + relevant_segments.push(segment.id); + } + } + } + + // Sort by segment id + relevant_segments.sort_unstable(); + + let mut iter = Self { + segment_manager, + segments: relevant_segments, + current_segment_idx: 0, + current_iterator: None, + log_encoding, + record_encoding, + table_id, + start, + end, + current_payload: Vec::new(), + }; + + // Load the first segment iterator + iter.load_next_segment_iterator()?; + + Ok(iter) + } + + fn load_next_segment_iterator(&mut self) -> Result { + if self.current_segment_idx >= self.segments.len() { + self.current_iterator = None; + return Ok(false); + } + + let segment = self.segments[self.current_segment_idx]; + let segment = self.segment_manager.get_segment(segment)?; + let iterator = SegmentLogIterator::new( + self.log_encoding.clone(), + self.record_encoding.clone(), + segment, + self.segment_manager.clone(), + self.table_id, + self.start, + self.end, + )?; + + self.current_iterator = Some(iterator); + self.current_segment_idx += 1; + + Ok(true) + } + + fn next(&mut self) -> Result>> { + loop { + if let Some(ref mut iterator) = self.current_iterator { + if let Some(entry) = iterator.next()? { + self.current_payload = entry.payload.to_owned(); + return Ok(Some(LogEntry { + table_id: entry.table_id, + sequence: entry.sequence, + payload: &self.current_payload, + })); + } + } + if !self.load_next_segment_iterator()? { + return Ok(None); + } + } + } +} + +impl SyncLogIterator for MultiSegmentLogIterator { fn next_log_entry(&mut self) -> crate::manager::Result>> { self.next().box_err().context(Read) } @@ -812,12 +1098,19 @@ impl SyncLogIterator for SegmentLogIterator { #[cfg(test)] mod tests { - use std::sync::Arc; + use std::{sync::Arc, time::Duration}; use runtime::Builder; use tempfile::tempdir; use super::*; + use crate::{ + kv_encoder::LogBatchEncoder, + log_batch::{MemoryPayload, MemoryPayloadDecoder}, + manager::ReadBoundary, + }; + + const SEGMENT_SIZE: usize = 64 * 1024 * 1024; #[test] fn test_segment_creation() { @@ -829,14 +1122,14 @@ mod tests { .unwrap() .to_string(); - let segment = Segment::new(path.clone(), 0); + let segment = Segment::new(path.clone(), 0, SEGMENT_SIZE); assert!(segment.is_ok()); let segment = segment.unwrap(); assert_eq!(segment.version, NEWEST_WAL_SEGMENT_VERSION); assert_eq!(segment.path, path); assert_eq!(segment.id, 0); - assert_eq!(segment.size, SEGMENT_HEADER.len() as u64); + assert_eq!(segment.current_size, SEGMENT_HEADER.len()); let segment_content = fs::read(path).unwrap(); assert_eq!(segment_content[0], NEWEST_WAL_SEGMENT_VERSION); @@ -855,10 +1148,11 @@ mod tests { .to_str() .unwrap() .to_string(); - let mut segment = Segment::new(path.clone(), 0).unwrap(); + let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); - let result = segment.open(); - assert!(result.is_ok()); + segment + .open() + .expect("Expected to open segment successfully"); } #[test] @@ -870,31 +1164,140 @@ mod tests { .to_str() .unwrap() .to_string(); - let mut segment = Segment::new(path.clone(), 0).unwrap(); + let mut segment = Segment::new(path.clone(), 0, SEGMENT_SIZE).unwrap(); segment.open().unwrap(); let data = b"test_data"; let append_result = segment.append(data); assert!(append_result.is_ok()); - let read_result = segment.read( - (VERSION_SIZE + SEGMENT_HEADER.len()) as u64, - data.len() as u64, - ); + let read_result = segment.read(VERSION_SIZE + SEGMENT_HEADER.len(), data.len()); assert!(read_result.is_ok()); assert_eq!(read_result.unwrap(), data); } #[test] - fn test_region_creation() { + fn test_region_create_and_close() { let dir = tempdir().unwrap(); let runtime = Arc::new(Builder::default().build().unwrap()); - let segment_manager = Region::new(1, 1, dir.path().to_str().unwrap().to_string(), runtime); - assert!(segment_manager.is_ok()); + let region = Region::new( + 1, + 1, + SEGMENT_SIZE, + dir.path().to_str().unwrap().to_string(), + runtime, + ) + .unwrap(); - let segment_manager = segment_manager.unwrap(); - let segment = segment_manager.get_segment(0); - assert!(segment.is_ok()); + let _segment = region.segment_manager.get_segment(0).unwrap(); + + region.close().unwrap() + } + + #[test] + fn test_region_manager_create_and_close() { + let dir = tempdir().unwrap(); + let runtime = Arc::new(Builder::default().build().unwrap()); + + let region_manager = RegionManager::new( + dir.path().to_str().unwrap().to_string(), + 1, + SEGMENT_SIZE, + runtime, + ) + .unwrap(); + + region_manager.close_all().unwrap(); + } + + async fn test_multi_segment_write_and_read_inner(runtime: Arc) { + // Set a small max segment size + const SEGMENT_SIZE: usize = 4096; + + // Create a temporary directory + let dir = tempdir().unwrap(); + + // Create a new Region + let region = Region::new( + 1, + 2, + SEGMENT_SIZE, + dir.path().to_str().unwrap().to_string(), + runtime.clone(), + ) + .unwrap(); + let region = Arc::new(region); + + // Write data + let mut expected_entries = Vec::new(); + let mut sequence = MIN_SEQUENCE_NUMBER + 1; + let location = WalLocation::new(1, 1); + for _i in 0..10 { + // Write 10 batches, 100 entries each + let log_entries = 0..100; + let log_batch_encoder = LogBatchEncoder::create(location); + let log_batch = log_batch_encoder + .encode_batch(log_entries.clone().map(|v| MemoryPayload { val: v })) + .expect("should succeed to encode payloads"); + for j in log_entries { + let payload = MemoryPayload { val: j }; + expected_entries.push(LogEntry { + table_id: 1, + sequence, + payload, + }); + sequence += 1; + } + + let write_ctx = WriteContext::default(); + region.write(&write_ctx, &log_batch).unwrap(); + } + + // Read data + let read_ctx = ReadContext { + timeout: Duration::from_secs(5), + batch_size: 1000, + }; + let read_req = ReadRequest { + location: WalLocation::new(1, 1), + start: ReadBoundary::Min, + end: ReadBoundary::Max, + }; + let mut iterator = region.read(&read_ctx, &read_req).unwrap(); + + // Collect read entries + let dec = MemoryPayloadDecoder; + let read_entries = iterator + .next_log_entries(dec, |_| true, VecDeque::new()) + .await + .unwrap(); + + // Verify that read data matches written data + assert_eq!(expected_entries.len(), read_entries.len()); + + for (expected, actual) in expected_entries.iter().zip(read_entries.iter()) { + assert_eq!(expected.table_id, actual.table_id); + assert_eq!(expected.sequence, actual.sequence); + assert_eq!(expected.payload, actual.payload); + } + + { + // Verify that multiple segments were created + let all_segments = region.segment_manager.all_segments.lock().unwrap(); + assert!( + all_segments.len() > 1, + "Expected multiple segments, but got {}", + all_segments.len() + ); + } + + region.close().unwrap() + } + + #[test] + fn test_multi_segment_write_and_read() { + let runtime = Arc::new(Builder::default().build().unwrap()); + runtime.block_on(test_multi_segment_write_and_read_inner(runtime.clone())); } } diff --git a/src/wal/src/local_storage_impl/wal_manager.rs b/src/wal/src/local_storage_impl/wal_manager.rs index d05e90d886..91c69fcd5a 100644 --- a/src/wal/src/local_storage_impl/wal_manager.rs +++ b/src/wal/src/local_storage_impl/wal_manager.rs @@ -43,7 +43,7 @@ use crate::{ pub struct LocalStorageImpl { config: LocalStorageConfig, _runtime: Arc, - segment_manager: RegionManager, + region_manager: RegionManager, } impl LocalStorageImpl { @@ -52,17 +52,26 @@ impl LocalStorageImpl { config: LocalStorageConfig, runtime: Arc, ) -> Result { - let LocalStorageConfig { cache_size, .. } = config.clone(); + let LocalStorageConfig { + cache_size, + max_segment_size, + .. + } = config.clone(); let wal_path_str = wal_path.to_str().unwrap().to_string(); - let segment_manager = RegionManager::new(wal_path_str.clone(), cache_size, runtime.clone()) - .box_err() - .context(Open { - wal_path: wal_path_str, - })?; + let region_manager = RegionManager::new( + wal_path_str.clone(), + cache_size, + max_segment_size, + runtime.clone(), + ) + .box_err() + .context(Open { + wal_path: wal_path_str, + })?; Ok(Self { config, _runtime: runtime, - segment_manager, + region_manager, }) } } @@ -84,7 +93,7 @@ impl Debug for LocalStorageImpl { #[async_trait] impl WalManager for LocalStorageImpl { async fn sequence_num(&self, location: WalLocation) -> Result { - self.segment_manager + self.region_manager .sequence_num(location) .box_err() .context(Read) @@ -95,7 +104,7 @@ impl WalManager for LocalStorageImpl { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { - self.segment_manager + self.region_manager .mark_delete_entries_up_to(location, sequence_num) .box_err() .context(Delete) @@ -106,13 +115,16 @@ impl WalManager for LocalStorageImpl { "Close region for LocalStorage based WAL is noop operation, region_id:{}", region_id ); - + self.region_manager + .close(region_id) + .box_err() + .context(Close)?; Ok(()) } async fn close_gracefully(&self) -> Result<()> { info!("Close local storage wal gracefully"); - // todo: close all opened files + self.region_manager.close_all().box_err().context(Close)?; Ok(()) } @@ -121,18 +133,18 @@ impl WalManager for LocalStorageImpl { ctx: &ReadContext, req: &ReadRequest, ) -> Result { - self.segment_manager.read(ctx, req).box_err().context(Read) + self.region_manager.read(ctx, req).box_err().context(Read) } async fn write(&self, ctx: &WriteContext, batch: &LogWriteBatch) -> Result { - self.segment_manager + self.region_manager .write(ctx, batch) .box_err() .context(Write) } async fn scan(&self, ctx: &ScanContext, req: &ScanRequest) -> Result { - self.segment_manager.scan(ctx, req).box_err().context(Read) + self.region_manager.scan(ctx, req).box_err().context(Read) } async fn get_statistics(&self) -> Option {