From 70feb7e2ad598220f3f57ca0ff96a1d579f94598 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Thu, 22 Apr 2021 18:13:48 -0700 Subject: [PATCH 1/8] add Lsn type This type is a zero-cost wrapper for a u64, meant to help code communicate with precision what that value means. It implements Display and Debug. Display "{}" will format as "1234ABCD:5678CDEF" while Debug will format as Lsn{1234567890}. --- zenith_utils/src/lib.rs | 2 + zenith_utils/src/lsn.rs | 137 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 zenith_utils/src/lsn.rs diff --git a/zenith_utils/src/lib.rs b/zenith_utils/src/lib.rs index 9de98202c67a..fb4d415f22b1 100644 --- a/zenith_utils/src/lib.rs +++ b/zenith_utils/src/lib.rs @@ -1,5 +1,7 @@ //! zenith_utils is intended to be a place to put code that is shared //! between other crates in this repository. +/// `Lsn` type implements common tasks on Log Sequence Numbers +pub mod lsn; /// SeqWait allows waiting for a future sequence number to arrive pub mod seqwait; diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs new file mode 100644 index 000000000000..a67b17b1e4ff --- /dev/null +++ b/zenith_utils/src/lsn.rs @@ -0,0 +1,137 @@ +#![warn(missing_docs)] + +use std::fmt; +use std::ops::{Add, AddAssign, Sub}; +use std::path::Path; +use std::str::FromStr; + +/// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr +#[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] +pub struct Lsn(pub u64); + +/// We tried to parse an LSN from a string, but failed +#[derive(Debug, PartialEq, thiserror::Error)] +#[error("LsnParseError")] +pub struct LsnParseError; + +impl Lsn { + /// Maximum possible value for an LSN + pub const MAX: Lsn = Lsn(u64::MAX); + + /// Parse an LSN from a filename in the form `0000000000000000` + pub fn from_filename(filename: F) -> Result + where + F: AsRef, + { + let filename: &Path = filename.as_ref(); + let filename = filename.to_str().ok_or(LsnParseError)?; + Lsn::from_hex(filename) + } + + /// Parse an LSN from a string in the form `0000000000000000` + pub fn from_hex(s: S) -> Result + where + S: AsRef, + { + let s: &str = s.as_ref(); + let n = u64::from_str_radix(s, 16).or(Err(LsnParseError))?; + Ok(Lsn(n)) + } + + /// Compute the offset into a segment + pub fn segment_offset(self, seg_sz: u64) -> u64 { + self.0 % seg_sz + } + + /// Compute the segment number + pub fn segment_number(self, seg_sz: u64) -> u64 { + self.0 / seg_sz + } +} + +impl From for Lsn { + fn from(n: u64) -> Self { + Lsn(n) + } +} + +impl From for u64 { + fn from(lsn: Lsn) -> u64 { + lsn.0 + } +} + +impl FromStr for Lsn { + type Err = LsnParseError; + + /// Parse an LSN from a string in the form `00000000/00000000` + /// + /// If the input string is missing the '/' character, then use `Lsn::from_hex` + fn from_str(s: &str) -> Result { + let mut splitter = s.split('/'); + if let (Some(left), Some(right), None) = (splitter.next(), splitter.next(), splitter.next()) + { + let left_num = u32::from_str_radix(left, 16).map_err(|_| LsnParseError)?; + let right_num = u32::from_str_radix(right, 16).map_err(|_| LsnParseError)?; + Ok(Lsn((left_num as u64) << 32 | right_num as u64)) + } else { + Err(LsnParseError) + } + } +} + +impl fmt::Display for Lsn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:X}/{:X}", self.0 >> 32, self.0 & 0xffffffff) + } +} + +impl Sub for Lsn { + type Output = Lsn; + + fn sub(self, other: u64) -> Self::Output { + // panic if the subtraction overflows/underflows. + Lsn(self.0.checked_sub(other).unwrap()) + } +} + +impl Add for Lsn { + type Output = Lsn; + + fn add(self, other: u64) -> Self::Output { + // panic if the addition overflows. + Lsn(self.0.checked_add(other).unwrap()) + } +} + +impl AddAssign for Lsn { + fn add_assign(&mut self, other: u64) { + // panic if the addition overflows. + self.0 = self.0.checked_add(other).unwrap(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lsn_strings() { + assert_eq!("12345678/AAAA5555".parse(), Ok(Lsn(0x12345678AAAA5555))); + assert_eq!("aaaa/bbbb".parse(), Ok(Lsn(0x0000AAAA0000BBBB))); + assert_eq!("1/A".parse(), Ok(Lsn(0x000000010000000A))); + assert_eq!("0/0".parse(), Ok(Lsn(0))); + "ABCDEFG/12345678".parse::().unwrap_err(); + "123456789/AAAA5555".parse::().unwrap_err(); + "12345678/AAAA55550".parse::().unwrap_err(); + "-1/0".parse::().unwrap_err(); + "1/-1".parse::().unwrap_err(); + + assert_eq!(format!("{}", Lsn(0x12345678AAAA5555)), "12345678/AAAA5555"); + assert_eq!(format!("{}", Lsn(0x000000010000000A)), "1/A"); + + assert_eq!(Lsn::from_hex("12345678AAAA5555"), Ok(Lsn(0x12345678AAAA5555))); + assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0))); + assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError)); + } +} From e94e12a17367dfe0470ce40f05cdc8b93d0f8942 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 23 Apr 2021 13:55:42 -0700 Subject: [PATCH 2/8] make seqwait generic SeqWait can use any type that is Ord + Debug + Copy. Debug is not strictly necessary, but allows us to keep the panic message if a caller wants the sequence number to go backwards. --- zenith_utils/src/seqwait.rs | 45 +++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/zenith_utils/src/seqwait.rs b/zenith_utils/src/seqwait.rs index c30304ab6ab4..b4f3cdd45424 100644 --- a/zenith_utils/src/seqwait.rs +++ b/zenith_utils/src/seqwait.rs @@ -1,6 +1,7 @@ #![warn(missing_docs)] use std::collections::BTreeMap; +use std::fmt::Debug; use std::mem; use std::sync::Mutex; use std::time::Duration; @@ -18,9 +19,12 @@ pub enum SeqWaitError { } /// Internal components of a `SeqWait` -struct SeqWaitInt { - waiters: BTreeMap, Receiver<()>)>, - current: u64, +struct SeqWaitInt +where + T: Ord, +{ + waiters: BTreeMap, Receiver<()>)>, + current: T, shutdown: bool, } @@ -38,13 +42,19 @@ struct SeqWaitInt { /// [`wait_for`]: SeqWait::wait_for /// [`advance`]: SeqWait::advance /// -pub struct SeqWait { - internal: Mutex, +pub struct SeqWait +where + T: Ord, +{ + internal: Mutex>, } -impl SeqWait { +impl SeqWait +where + T: Ord + Debug + Copy, +{ /// Create a new `SeqWait`, initialized to a particular number - pub fn new(starting_num: u64) -> Self { + pub fn new(starting_num: T) -> Self { let internal = SeqWaitInt { waiters: BTreeMap::new(), current: starting_num, @@ -82,7 +92,7 @@ impl SeqWait { /// /// This call won't complete until someone has called `advance` /// with a number greater than or equal to the one we're waiting for. - pub async fn wait_for(&self, num: u64) -> Result<(), SeqWaitError> { + pub async fn wait_for(&self, num: T) -> Result<(), SeqWaitError> { let mut rx = { let mut internal = self.internal.lock().unwrap(); if internal.current >= num { @@ -116,7 +126,7 @@ impl SeqWait { /// [`SeqWaitError::Timeout`] will be returned. pub async fn wait_for_timeout( &self, - num: u64, + num: T, timeout_duration: Duration, ) -> Result<(), SeqWaitError> { timeout(timeout_duration, self.wait_for(num)) @@ -130,23 +140,30 @@ impl SeqWait { /// /// `advance` will panic if you send it a lower number than /// a previous call. - pub fn advance(&self, num: u64) { + pub fn advance(&self, num: T) { let wake_these = { let mut internal = self.internal.lock().unwrap(); if internal.current > num { panic!( - "tried to advance backwards, from {} to {}", + "tried to advance backwards, from {:?} to {:?}", internal.current, num ); } internal.current = num; // split_off will give me all the high-numbered waiters, - // so split and then swap. Everything at or above (num + 1) - // gets to stay. - let mut split = internal.waiters.split_off(&(num + 1)); + // so split and then swap. Everything at or above `num` + // stays. + let mut split = internal.waiters.split_off(&num); std::mem::swap(&mut split, &mut internal.waiters); + + // `split_at` didn't get the value at `num`; if it's + // there take that too. + if let Some(sleeper) = internal.waiters.remove(&num) { + split.insert(num, sleeper); + } + split }; From 78d11aeedbabc6a7f153e6059077fb17257cb74d Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Fri, 23 Apr 2021 13:58:03 -0700 Subject: [PATCH 3/8] apply Lsn type everywhere Use the `Lsn` type everywhere that I can find u64 being used to represent an LSN. --- pageserver/src/page_cache.rs | 105 +++++++++++++-------------- pageserver/src/page_service.rs | 5 +- pageserver/src/restore_local_repo.rs | 33 ++++----- pageserver/src/waldecoder.rs | 38 +++++----- pageserver/src/walreceiver.rs | 53 +++++--------- pageserver/src/walredo.rs | 32 ++++---- 6 files changed, 122 insertions(+), 144 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 9d478d3a616b..0f1863935485 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -47,6 +47,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; +use zenith_utils::lsn::Lsn; use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -62,7 +63,7 @@ pub struct PageCache { walredo_mgr: WalRedoManager, // Allows .await on the arrival of a particular LSN. - seqwait_lsn: SeqWait, + seqwait_lsn: SeqWait, // Counters, for metrics collection. pub num_entries: AtomicU64, @@ -120,9 +121,9 @@ struct PageCacheShared { // walreceiver.rs instead of here, but it seems convenient to keep all three // values together. // - first_valid_lsn: u64, - last_valid_lsn: u64, - last_record_lsn: u64, + first_valid_lsn: Lsn, + last_valid_lsn: Lsn, + last_record_lsn: Lsn, } lazy_static! { @@ -204,16 +205,16 @@ fn open_rocksdb(_conf: &PageServerConf, timelineid: ZTimelineId) -> rocksdb::DB fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache { PageCache { shared: Mutex::new(PageCacheShared { - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, + first_valid_lsn: Lsn(0), + last_valid_lsn: Lsn(0), + last_record_lsn: Lsn(0), }), db: open_rocksdb(&conf, timelineid), walredo_mgr: WalRedoManager::new(conf, timelineid), - seqwait_lsn: SeqWait::new(0), + seqwait_lsn: SeqWait::new(Lsn(0)), num_entries: AtomicU64::new(0), num_page_images: AtomicU64::new(0), @@ -242,18 +243,18 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct CacheKey { pub tag: BufferTag, - pub lsn: u64, + pub lsn: Lsn, } impl CacheKey { pub fn pack(&self, buf: &mut BytesMut) { self.tag.pack(buf); - buf.put_u64(self.lsn); + buf.put_u64(self.lsn.0); } pub fn unpack(buf: &mut BytesMut) -> CacheKey { CacheKey { tag: BufferTag::unpack(buf), - lsn: buf.get_u64(), + lsn: Lsn::from(buf.get_u64()), } } } @@ -343,7 +344,7 @@ impl BufferTag { #[derive(Debug, Clone)] pub struct WALRecord { - pub lsn: u64, // LSN at the *end* of the record + pub lsn: Lsn, // LSN at the *end* of the record pub will_init: bool, pub truncate: bool, pub rec: Bytes, @@ -355,7 +356,7 @@ pub struct WALRecord { impl WALRecord { pub fn pack(&self, buf: &mut BytesMut) { - buf.put_u64(self.lsn); + buf.put_u64(self.lsn.0); buf.put_u8(self.will_init as u8); buf.put_u8(self.truncate as u8); buf.put_u32(self.main_data_offset); @@ -363,7 +364,7 @@ impl WALRecord { buf.put_slice(&self.rec[..]); } pub fn unpack(buf: &mut BytesMut) -> WALRecord { - let lsn = buf.get_u64(); + let lsn = Lsn::from(buf.get_u64()); let will_init = buf.get_u8() != 0; let truncate = buf.get_u8() != 0; let main_data_offset = buf.get_u32(); @@ -387,7 +388,7 @@ impl PageCache { /// /// Returns an 8k page image /// - pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: u64) -> anyhow::Result { + pub async fn get_page_at_lsn(&self, tag: BufferTag, req_lsn: Lsn) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); let lsn = self.wait_lsn(req_lsn).await?; @@ -448,7 +449,7 @@ impl PageCache { /// /// Get size of relation at given LSN. /// - pub async fn relsize_get(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { + pub async fn relsize_get(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { self.wait_lsn(lsn).await?; return self.relsize_get_nowait(rel, lsn); } @@ -456,7 +457,7 @@ impl PageCache { /// /// Does relation exist at given LSN? /// - pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: u64) -> anyhow::Result { + pub async fn relsize_exist(&self, rel: &RelTag, req_lsn: Lsn) -> anyhow::Result { let lsn = self.wait_lsn(req_lsn).await?; let key = CacheKey { @@ -497,7 +498,7 @@ impl PageCache { pub fn collect_records_for_apply( &self, tag: BufferTag, - lsn: u64, + lsn: Lsn, ) -> (Option, Vec) { let mut buf = BytesMut::new(); let key = CacheKey { tag, lsn }; @@ -576,7 +577,7 @@ impl PageCache { let mut key = CacheKey { tag, lsn: rec.lsn }; // What was the size of the relation before this record? - let last_lsn = self.last_valid_lsn.load(Ordering::Acquire); + let last_lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; let content = CacheEntryContent { @@ -606,7 +607,7 @@ impl PageCache { /// /// Memorize a full image of a page version /// - pub fn put_page_image(&self, tag: BufferTag, lsn: u64, img: Bytes) { + pub fn put_page_image(&self, tag: BufferTag, lsn: Lsn, img: Bytes) { let key = CacheKey { tag, lsn }; let content = CacheEntryContent { page_image: Some(img), @@ -629,7 +630,7 @@ impl PageCache { pub fn create_database( &self, - lsn: u64, + lsn: Lsn, db_id: Oid, tablespace_id: Oid, src_db_id: Oid, @@ -646,7 +647,7 @@ impl PageCache { }, blknum: 0, }, - lsn: 0, + lsn: Lsn(0), }; key.pack(&mut buf); let mut iter = self.db.raw_iterator(); @@ -679,22 +680,19 @@ impl PageCache { } /// Remember that WAL has been received and added to the page cache up to the given LSN - pub fn advance_last_valid_lsn(&self, lsn: u64) { + pub fn advance_last_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { shared.last_valid_lsn = lsn; - self.last_valid_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); self.seqwait_lsn.advance(lsn); } else { warn!( - "attempted to move last valid LSN backwards (was {:X}/{:X}, new {:X}/{:X})", - oldlsn >> 32, - oldlsn & 0xffffffff, - lsn >> 32, - lsn & 0xffffffff + "attempted to move last valid LSN backwards (was {}, new {})", + oldlsn, lsn ); } } @@ -704,7 +702,7 @@ impl PageCache { /// /// NOTE: this updates last_valid_lsn as well. /// - pub fn advance_last_record_lsn(&self, lsn: u64) { + pub fn advance_last_record_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. @@ -713,8 +711,8 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); + self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_record_lsn.store(lsn.0, Ordering::Relaxed); self.seqwait_lsn.advance(lsn); } @@ -723,7 +721,7 @@ impl PageCache { /// /// TODO: This should be called by garbage collection, so that if an older /// page is requested, we will return an error to the requestor. - pub fn _advance_first_valid_lsn(&self, lsn: u64) { + pub fn _advance_first_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); // Can't move backwards. @@ -731,29 +729,29 @@ impl PageCache { // Can't overtake last_valid_lsn (except when we're // initializing the system and last_valid_lsn hasn't been set yet. - assert!(shared.last_valid_lsn == 0 || lsn < shared.last_valid_lsn); + assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn); shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); } - pub fn init_valid_lsn(&self, lsn: u64) { + pub fn init_valid_lsn(&self, lsn: Lsn) { let mut shared = self.shared.lock().unwrap(); - assert!(shared.first_valid_lsn == 0); - assert!(shared.last_valid_lsn == 0); - assert!(shared.last_record_lsn == 0); + assert!(shared.first_valid_lsn == Lsn(0)); + assert!(shared.last_valid_lsn == Lsn(0)); + assert!(shared.last_record_lsn == Lsn(0)); shared.first_valid_lsn = lsn; shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.first_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_valid_lsn.store(lsn, Ordering::Relaxed); - self.last_record_lsn.store(lsn, Ordering::Relaxed); + self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_record_lsn.store(lsn.0, Ordering::Relaxed); } - pub fn get_last_valid_lsn(&self) -> u64 { + pub fn get_last_valid_lsn(&self) -> Lsn { let shared = self.shared.lock().unwrap(); shared.last_record_lsn @@ -781,8 +779,8 @@ impl PageCache { // // The caller must ensure that WAL has been received up to 'lsn'. // - fn relsize_get_nowait(&self, rel: &RelTag, lsn: u64) -> anyhow::Result { - assert!(lsn <= self.last_valid_lsn.load(Ordering::Acquire)); + fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { + assert!(lsn.0 <= self.last_valid_lsn.load(Ordering::Acquire)); let mut key = CacheKey { tag: BufferTag { @@ -833,7 +831,7 @@ impl PageCache { loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); - if last_lsn > conf.gc_horizon { + if last_lsn.0 > conf.gc_horizon { let horizon = last_lsn - conf.gc_horizon; let mut maxkey = CacheKey { tag: BufferTag { @@ -845,7 +843,7 @@ impl PageCache { }, blknum: u32::MAX, }, - lsn: u64::MAX, + lsn: Lsn::MAX, }; let now = Instant::now(); let mut reconstructed = 0u64; @@ -873,7 +871,7 @@ impl PageCache { maxkey.lsn = min(horizon, last_lsn); // do not remove last version let mut minkey = maxkey.clone(); - minkey.lsn = 0; // first version + minkey.lsn = Lsn(0); // first version // reconstruct most recent page version if (v[0] & PAGE_IMAGE_FLAG) == 0 { @@ -942,12 +940,12 @@ impl PageCache { // // Wait until WAL has been received up to the given LSN. // - async fn wait_lsn(&self, req_lsn: u64) -> anyhow::Result { + async fn wait_lsn(&self, req_lsn: Lsn) -> anyhow::Result { let mut lsn = req_lsn; //When invalid LSN is requested, it means "don't wait, return latest version of the page" //This is necessary for bootstrap. - if lsn == 0 { - lsn = self.last_valid_lsn.load(Ordering::Acquire); + if lsn == Lsn(0) { + lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); trace!( "walreceiver doesn't work yet last_valid_lsn {}, requested {}", self.last_valid_lsn.load(Ordering::Acquire), @@ -960,9 +958,8 @@ impl PageCache { .await .with_context(|| { format!( - "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", - lsn >> 32, - lsn & 0xffff_ffff + "Timed out while waiting for WAL record at LSN {} to arrive", + lsn ) })?; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 37790b556198..72f97aaaa7c3 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -25,6 +25,7 @@ use tokio::runtime; use tokio::runtime::Runtime; use tokio::sync::mpsc; use tokio::task; +use zenith_utils::lsn::Lsn; use crate::basebackup; use crate::page_cache; @@ -84,7 +85,7 @@ struct ZenithRequest { relnode: u32, forknum: u8, blkno: u32, - lsn: u64, + lsn: Lsn, } #[derive(Debug)] @@ -373,7 +374,7 @@ impl FeMessage { relnode: body.get_u32(), forknum: body.get_u8(), blkno: body.get_u32(), - lsn: body.get_u64(), + lsn: Lsn::from(body.get_u64()), }; // TODO: consider using protobuf or serde bincode for less error prone diff --git a/pageserver/src/restore_local_repo.rs b/pageserver/src/restore_local_repo.rs index d090419b0f13..c38d7adaa9f5 100644 --- a/pageserver/src/restore_local_repo.rs +++ b/pageserver/src/restore_local_repo.rs @@ -34,6 +34,7 @@ use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; use crate::ZTimelineId; use postgres_ffi::xlog_utils::*; +use zenith_utils::lsn::Lsn; // From pg_tablespace_d.h // @@ -60,20 +61,21 @@ pub fn restore_timeline( .join(timeline.to_string()) .join("snapshots"); - let mut last_snapshot_lsn: u64 = 0; + let mut last_snapshot_lsn: Lsn = Lsn(0); for direntry in fs::read_dir(&snapshotspath).unwrap() { let direntry = direntry?; - let filename = direntry.file_name().to_str().unwrap().to_owned(); - - let lsn = u64::from_str_radix(&filename, 16)?; + let filename = direntry.file_name(); + let lsn = Lsn::from_filename(&filename)?; last_snapshot_lsn = max(lsn, last_snapshot_lsn); - restore_snapshot(conf, pcache, timeline, &filename)?; - info!("restored snapshot at {}", filename); + // FIXME: pass filename as Path instead of str? + let filename_str = filename.into_string().unwrap(); + restore_snapshot(conf, pcache, timeline, &filename_str)?; + info!("restored snapshot at {:?}", filename_str); } - if last_snapshot_lsn == 0 { + if last_snapshot_lsn == Lsn(0) { error!( "could not find valid snapshot in {}", snapshotspath.display() @@ -183,7 +185,7 @@ fn restore_relfile( dboid: u32, path: &Path, ) -> Result<()> { - let lsn = u64::from_str_radix(snapshot, 16)?; + let lsn = Lsn::from_hex(snapshot)?; // Does it look like a relation file? @@ -245,15 +247,16 @@ fn restore_wal( _conf: &PageServerConf, pcache: &PageCache, timeline: ZTimelineId, - startpoint: u64, + startpoint: Lsn, ) -> Result<()> { let walpath = format!("timelines/{}/wal", timeline); let mut waldecoder = WalStreamDecoder::new(startpoint); - let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024); - let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024); - let mut last_lsn = 0; + const SEG_SIZE: u64 = 16 * 1024 * 1024; + let mut segno = startpoint.segment_number(SEG_SIZE); + let mut offset = startpoint.segment_offset(SEG_SIZE); + let mut last_lsn = Lsn(0); loop { // FIXME: assume postgresql tli 1 for now let filename = XLogFileName(1, segno, 16 * 1024 * 1024); @@ -336,11 +339,7 @@ fn restore_wal( segno += 1; offset = 0; } - info!( - "reached end of WAL at {:X}/{:X}", - last_lsn >> 32, - last_lsn & 0xffffffff - ); + info!("reached end of WAL at {}", last_lsn); Ok(()) } diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 6cfd446a7f46..29cd8109ded4 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -4,6 +4,7 @@ use log::*; use std::cmp::min; use std::str; use thiserror::Error; +use zenith_utils::lsn::Lsn; const XLOG_BLCKSZ: u32 = 8192; @@ -41,9 +42,9 @@ const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4; #[allow(dead_code)] pub struct WalStreamDecoder { - lsn: u64, + lsn: Lsn, - startlsn: u64, // LSN where this record starts + startlsn: Lsn, // LSN where this record starts contlen: u32, padlen: u32, @@ -56,7 +57,7 @@ pub struct WalStreamDecoder { #[error("{msg} at {lsn}")] pub struct WalDecodeError { msg: String, - lsn: u64, + lsn: Lsn, } // @@ -64,11 +65,11 @@ pub struct WalDecodeError { // FIXME: This isn't a proper rust stream // impl WalStreamDecoder { - pub fn new(lsn: u64) -> WalStreamDecoder { + pub fn new(lsn: Lsn) -> WalStreamDecoder { WalStreamDecoder { lsn, - startlsn: 0, + startlsn: Lsn(0), contlen: 0, padlen: 0, @@ -89,10 +90,10 @@ impl WalStreamDecoder { /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function /// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid. /// - pub fn poll_decode(&mut self) -> Result, WalDecodeError> { + pub fn poll_decode(&mut self) -> Result, WalDecodeError> { loop { // parse and verify page boundaries as we go - if self.lsn % WAL_SEGMENT_SIZE == 0 { + if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 { // parse long header if self.inputbuf.remaining() < SizeOfXLogLongPHD { @@ -100,7 +101,7 @@ impl WalStreamDecoder { } let hdr = self.decode_XLogLongPageHeaderData(); - if hdr.std.xlp_pageaddr != self.lsn { + if hdr.std.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog segment header".into(), lsn: self.lsn, @@ -110,7 +111,8 @@ impl WalStreamDecoder { self.lsn += SizeOfXLogLongPHD as u64; continue; - } else if self.lsn % (XLOG_BLCKSZ as u64) == 0 { + } else if self.lsn.0 % (XLOG_BLCKSZ as u64) == 0 { + // FIXME: make this a member of Lsn, but what should it be called? // parse page header if self.inputbuf.remaining() < SizeOfXLogShortPHD { @@ -118,7 +120,7 @@ impl WalStreamDecoder { } let hdr = self.decode_XLogPageHeaderData(); - if hdr.xlp_pageaddr != self.lsn { + if hdr.xlp_pageaddr != self.lsn.0 { return Err(WalDecodeError { msg: "invalid xlog page header".into(), lsn: self.lsn, @@ -163,7 +165,8 @@ impl WalStreamDecoder { continue; } else { // we're continuing a record, possibly from previous page. - let pageleft: u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32; + // FIXME: Should any of this math be captured into Lsn or a related type? + let pageleft: u32 = XLOG_BLCKSZ - (self.lsn.0 % (XLOG_BLCKSZ as u64)) as u32; // read the rest of the record, or as much as fits on this page. let n = min(self.contlen, pageleft) as usize; @@ -184,16 +187,13 @@ impl WalStreamDecoder { // XLOG_SWITCH records are special. If we see one, we need to skip // to the next WAL segment. if is_xlog_switch_record(&recordbuf) { - trace!( - "saw xlog switch record at {:X}/{:X}", - (self.lsn >> 32), - self.lsn & 0xffffffff - ); - self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32; + trace!("saw xlog switch record at {}", self.lsn); + self.padlen = (WAL_SEGMENT_SIZE - (self.lsn.0 % WAL_SEGMENT_SIZE)) as u32; } - if self.lsn % 8 != 0 { - self.padlen = 8 - (self.lsn % 8) as u32; + // FIXME: what does this code do? + if self.lsn.0 % 8 != 0 { + self.padlen = 8 - (self.lsn.0 % 8) as u32; } let result = (self.lsn, recordbuf); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 6671c13d2563..18d7aa842e81 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -31,6 +31,7 @@ use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; use tokio_stream::StreamExt; +use zenith_utils::lsn::Lsn; // // We keep one WAL Receiver active per timeline. @@ -138,7 +139,7 @@ async fn walreceiver_main( let identify = identify_system(&rclient).await?; info!("{:?}", identify); - let end_of_wal = u64::from(identify.xlogpos); + let end_of_wal = Lsn::from(u64::from(identify.xlogpos)); let mut caught_up = false; let pcache = page_cache::get_pagecache(&conf, timelineid).unwrap(); @@ -148,7 +149,7 @@ async fn walreceiver_main( // let mut startpoint = pcache.get_last_valid_lsn(); let last_valid_lsn = pcache.get_last_valid_lsn(); - if startpoint == 0 { + if startpoint == Lsn(0) { // If we start here with identify.xlogpos we will have race condition with // postgres start: insert into postgres may request page that was modified with lsn // smaller than identify.xlogpos. @@ -157,37 +158,31 @@ async fn walreceiver_main( // different like having 'initdb' method on a pageserver (or importing some shared // empty database snapshot), so for now I just put start of first segment which // seems to be a valid record. - pcache.init_valid_lsn(0x_1_000_000_u64); - startpoint = 0x_1_000_000_u64; + pcache.init_valid_lsn(Lsn(0x_1_000_000)); + startpoint = Lsn(0x_1_000_000); } else { // There might be some padding after the last full record, skip it. // // FIXME: It probably would be better to always start streaming from the beginning // of the page, or the segment, so that we could check the page/segment headers // too. Just for the sake of paranoia. - if startpoint % 8 != 0 { - startpoint += 8 - (startpoint % 8); + // FIXME: should any of this logic move inside the Lsn type? + if startpoint.0 % 8 != 0 { + startpoint += 8 - (startpoint.0 % 8); } } debug!( - "last_valid_lsn {:X}/{:X} starting replication from {:X}/{:X} for timeline {}, server is at {:X}/{:X}...", - (last_valid_lsn >> 32), - (last_valid_lsn & 0xffffffff), - (startpoint >> 32), - (startpoint & 0xffffffff), - timelineid, - (end_of_wal >> 32), - (end_of_wal & 0xffffffff) + "last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...", + last_valid_lsn, startpoint, timelineid, end_of_wal ); - let startpoint = PgLsn::from(startpoint); let query = format!("START_REPLICATION PHYSICAL {}", startpoint); let copy_stream = rclient.copy_both_simple::(&query).await?; let physical_stream = ReplicationStream::new(copy_stream); tokio::pin!(physical_stream); - let mut waldecoder = WalStreamDecoder::new(u64::from(startpoint)); + let mut waldecoder = WalStreamDecoder::new(startpoint); while let Some(replication_message) = physical_stream.next().await { match replication_message? { @@ -195,7 +190,7 @@ async fn walreceiver_main( // Pass the WAL data to the decoder, and see if we can decode // more records as a result. let data = xlog_data.data(); - let startlsn = xlog_data.wal_start(); + let startlsn = Lsn::from(xlog_data.wal_start()); let endlsn = startlsn + data.len() as u64; write_wal_file( @@ -205,13 +200,7 @@ async fn walreceiver_main( data, )?; - trace!( - "received XLogData between {:X}/{:X} and {:X}/{:X}", - (startlsn >> 32), - (startlsn & 0xffffffff), - (endlsn >> 32), - (endlsn & 0xffffffff) - ); + trace!("received XLogData between {} and {}", startlsn, endlsn); waldecoder.feed_bytes(data); @@ -298,11 +287,7 @@ async fn walreceiver_main( pcache.advance_last_valid_lsn(endlsn); if !caught_up && endlsn >= end_of_wal { - info!( - "caught up at LSN {:X}/{:X}", - (endlsn >> 32), - (endlsn & 0xffffffff) - ); + info!("caught up at LSN {}", endlsn); caught_up = true; } } @@ -320,7 +305,7 @@ async fn walreceiver_main( ); if reply_requested { // TODO: More thought should go into what values are sent here. - let last_lsn = PgLsn::from(pcache.get_last_valid_lsn()); + let last_lsn = PgLsn::from(u64::from(pcache.get_last_valid_lsn())); let write_lsn = last_lsn; let flush_lsn = last_lsn; let apply_lsn = PgLsn::INVALID; @@ -387,7 +372,7 @@ pub async fn identify_system(client: &tokio_postgres::Client) -> Result>, } @@ -138,7 +138,7 @@ impl WalRedoManager { /// Request the WAL redo manager to apply WAL records, to reconstruct the page image /// of the given page version. /// - pub async fn request_redo(&self, tag: BufferTag, lsn: u64) -> Result { + pub async fn request_redo(&self, tag: BufferTag, lsn: Lsn) -> Result { // Create a channel where to receive the response let (tx, rx) = oneshot::channel::>(); @@ -225,18 +225,16 @@ impl WalRedoManagerInternal { } else if info == pg_constants::XLOG_XACT_ABORT { status = pg_constants::TRANSACTION_STATUS_ABORTED; } else { - trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {:X}/{:X} main_data_offset {}, rec.len {}", + trace!("handle_apply_request for RM_XACT_ID-{} NOT SUPPORTED YET. RETURN. lsn {} main_data_offset {}, rec.len {}", status, - record.lsn >> 32, - record.lsn & 0xffffffff, + record.lsn, record.main_data_offset, record.rec.len()); return; } - trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {:X}/{:X} main_data_offset {}, rec.len {}", + trace!("handle_apply_request for RM_XACT_ID-{} (1-commit, 2-abort) lsn {} main_data_offset {}, rec.len {}", status, - record.lsn >> 32, - record.lsn & 0xffffffff, + record.lsn, record.main_data_offset, record.rec.len()); let byteno: usize = ((xl_rmid as u32 % pg_constants::CLOG_XACTS_PER_PAGE as u32) @@ -305,9 +303,8 @@ impl WalRedoManagerInternal { let info = xl_info & !pg_constants::XLR_INFO_MASK; if info == pg_constants::CLOG_ZEROPAGE { page.clone_from_slice(zero_page_bytes); - trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {:X}/{:X} main_data_offset {}, rec.len {}", - record.lsn >> 32, - record.lsn & 0xffffffff, + trace!("handle_apply_request for RM_CLOG_ID-CLOG_ZEROPAGE lsn {} main_data_offset {}, rec.len {}", + record.lsn, record.main_data_offset, record.rec.len()); } } else if xl_rmid == pg_constants::RM_XACT_ID { @@ -325,11 +322,10 @@ impl WalRedoManagerInternal { let result: Result; trace!( - "applied {} WAL records in {} ms to reconstruct page image at LSN {:X}/{:X}", + "applied {} WAL records in {} ms to reconstruct page image at LSN {}", nrecords, duration.as_millis(), - lsn >> 32, - lsn & 0xffff_ffff + lsn ); if let Err(e) = apply_result { @@ -536,13 +532,13 @@ fn build_push_page_msg(tag: BufferTag, base_img: Bytes) -> Bytes { buf.freeze() } -fn build_apply_record_msg(endlsn: u64, rec: Bytes) -> Bytes { +fn build_apply_record_msg(endlsn: Lsn, rec: Bytes) -> Bytes { let len = 4 + 8 + rec.len(); let mut buf = BytesMut::with_capacity(1 + len); buf.put_u8(b'A'); buf.put_u32(len as u32); - buf.put_u64(endlsn); + buf.put_u64(endlsn.0); buf.put(rec); assert!(buf.len() == 1 + len); From 2ae2627f6f546dd0caae635077398f0383a534e4 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sat, 24 Apr 2021 14:01:11 -0700 Subject: [PATCH 4/8] remove Lsn::sub in favor of sub_checked There is only one place doing subtraction, and it had a manually implemented check. --- pageserver/src/page_cache.rs | 5 +++-- zenith_utils/src/lsn.rs | 17 +++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 0f1863935485..82e03b8e51bb 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -831,8 +831,9 @@ impl PageCache { loop { thread::sleep(conf.gc_period); let last_lsn = self.get_last_valid_lsn(); - if last_lsn.0 > conf.gc_horizon { - let horizon = last_lsn - conf.gc_horizon; + + // checked_sub() returns None on overflow. + if let Some(horizon) = last_lsn.checked_sub(conf.gc_horizon) { let mut maxkey = CacheKey { tag: BufferTag { rel: RelTag { diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index a67b17b1e4ff..a1f48ddb84d5 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -1,7 +1,7 @@ #![warn(missing_docs)] use std::fmt; -use std::ops::{Add, AddAssign, Sub}; +use std::ops::{Add, AddAssign}; use std::path::Path; use std::str::FromStr; @@ -18,6 +18,12 @@ impl Lsn { /// Maximum possible value for an LSN pub const MAX: Lsn = Lsn(u64::MAX); + /// Subtract a number, returning None on overflow. + pub fn checked_sub>(self, other: T) -> Option { + let other: u64 = other.into(); + self.0.checked_sub(other).map(Lsn) + } + /// Parse an LSN from a filename in the form `0000000000000000` pub fn from_filename(filename: F) -> Result where @@ -86,15 +92,6 @@ impl fmt::Display for Lsn { } } -impl Sub for Lsn { - type Output = Lsn; - - fn sub(self, other: u64) -> Self::Output { - // panic if the subtraction overflows/underflows. - Lsn(self.0.checked_sub(other).unwrap()) - } -} - impl Add for Lsn { type Output = Lsn; From 3062c0e205f44082c7e29c86828fb300be64fa7c Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sat, 24 Apr 2021 15:00:09 -0700 Subject: [PATCH 5/8] add AtomicLsn AtomicLsn is a wrapper around AtomicU64 that has load() and store() members that are cheap (on x86, anyway) and can be safely used in any context. This commit uses AtomicLsn in the page cache, and fixes up some downstream code that manually implemented LSN formatting. There's also a bugfix to the logging in wait_lsn, which prints the wrong lsn value. --- pageserver/src/page_cache.rs | 73 +++++++++++++++++++----------------- pageserver/src/tui.rs | 12 +----- zenith_utils/src/lsn.rs | 36 ++++++++++++++++++ 3 files changed, 76 insertions(+), 45 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 82e03b8e51bb..55e4491c4c34 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -39,7 +39,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use rocksdb; -use std::cmp::min; +use std::cmp::{max, min}; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -47,7 +47,7 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; use std::{convert::TryInto, ops::AddAssign}; -use zenith_utils::lsn::Lsn; +use zenith_utils::lsn::{AtomicLsn, Lsn}; use zenith_utils::seqwait::SeqWait; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. @@ -73,9 +73,9 @@ pub struct PageCache { // copies of shared.first/last_valid_lsn fields (copied here so // that they can be read without acquiring the mutex). - pub first_valid_lsn: AtomicU64, - pub last_valid_lsn: AtomicU64, - pub last_record_lsn: AtomicU64, + first_valid_lsn: AtomicLsn, + last_valid_lsn: AtomicLsn, + last_record_lsn: AtomicLsn, } #[derive(Clone)] @@ -84,9 +84,9 @@ pub struct PageCacheStats { pub num_page_images: u64, pub num_wal_records: u64, pub num_getpage_requests: u64, - pub first_valid_lsn: u64, - pub last_valid_lsn: u64, - pub last_record_lsn: u64, + pub first_valid_lsn: Lsn, + pub last_valid_lsn: Lsn, + pub last_record_lsn: Lsn, } impl AddAssign for PageCacheStats { @@ -96,9 +96,12 @@ impl AddAssign for PageCacheStats { num_page_images: self.num_page_images + other.num_page_images, num_wal_records: self.num_wal_records + other.num_wal_records, num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests, - first_valid_lsn: self.first_valid_lsn + other.first_valid_lsn, - last_valid_lsn: self.last_valid_lsn + other.last_valid_lsn, - last_record_lsn: self.last_record_lsn + other.last_record_lsn, + + // FIXME: needs review + // What should be happening here? I'm not sure what is the desired result. + first_valid_lsn: min(self.first_valid_lsn, other.first_valid_lsn), + last_valid_lsn: max(self.last_valid_lsn, other.last_valid_lsn), + last_record_lsn: max(self.last_record_lsn, other.last_record_lsn), } } } @@ -221,9 +224,9 @@ fn init_page_cache(conf: &PageServerConf, timelineid: ZTimelineId) -> PageCache num_wal_records: AtomicU64::new(0), num_getpage_requests: AtomicU64::new(0), - first_valid_lsn: AtomicU64::new(0), - last_valid_lsn: AtomicU64::new(0), - last_record_lsn: AtomicU64::new(0), + first_valid_lsn: AtomicLsn::new(0), + last_valid_lsn: AtomicLsn::new(0), + last_record_lsn: AtomicLsn::new(0), } } @@ -577,7 +580,7 @@ impl PageCache { let mut key = CacheKey { tag, lsn: rec.lsn }; // What was the size of the relation before this record? - let last_lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); + let last_lsn = self.last_valid_lsn.load(); let old_rel_size = self.relsize_get_nowait(&tag.rel, last_lsn)?; let content = CacheEntryContent { @@ -687,7 +690,7 @@ impl PageCache { let oldlsn = shared.last_valid_lsn; if lsn >= oldlsn { shared.last_valid_lsn = lsn; - self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.last_valid_lsn.store(lsn); self.seqwait_lsn.advance(lsn); } else { warn!( @@ -711,8 +714,8 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); - self.last_record_lsn.store(lsn.0, Ordering::Relaxed); + self.last_valid_lsn.store(lsn); + self.last_record_lsn.store(lsn); self.seqwait_lsn.advance(lsn); } @@ -732,7 +735,7 @@ impl PageCache { assert!(shared.last_valid_lsn == Lsn(0) || lsn < shared.last_valid_lsn); shared.first_valid_lsn = lsn; - self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); + self.first_valid_lsn.store(lsn); } pub fn init_valid_lsn(&self, lsn: Lsn) { @@ -746,9 +749,9 @@ impl PageCache { shared.last_valid_lsn = lsn; shared.last_record_lsn = lsn; - self.first_valid_lsn.store(lsn.0, Ordering::Relaxed); - self.last_valid_lsn.store(lsn.0, Ordering::Relaxed); - self.last_record_lsn.store(lsn.0, Ordering::Relaxed); + self.first_valid_lsn.store(lsn); + self.last_valid_lsn.store(lsn); + self.last_record_lsn.store(lsn); } pub fn get_last_valid_lsn(&self) -> Lsn { @@ -766,9 +769,9 @@ impl PageCache { num_page_images: self.num_page_images.load(Ordering::Relaxed), num_wal_records: self.num_wal_records.load(Ordering::Relaxed), num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(Ordering::Relaxed), - last_valid_lsn: self.last_valid_lsn.load(Ordering::Relaxed), - last_record_lsn: self.last_record_lsn.load(Ordering::Relaxed), + first_valid_lsn: self.first_valid_lsn.load(), + last_valid_lsn: self.last_valid_lsn.load(), + last_record_lsn: self.last_record_lsn.load(), } } @@ -780,7 +783,7 @@ impl PageCache { // The caller must ensure that WAL has been received up to 'lsn'. // fn relsize_get_nowait(&self, rel: &RelTag, lsn: Lsn) -> anyhow::Result { - assert!(lsn.0 <= self.last_valid_lsn.load(Ordering::Acquire)); + assert!(lsn <= self.last_valid_lsn.load()); let mut key = CacheKey { tag: BufferTag { @@ -941,17 +944,17 @@ impl PageCache { // // Wait until WAL has been received up to the given LSN. // - async fn wait_lsn(&self, req_lsn: Lsn) -> anyhow::Result { - let mut lsn = req_lsn; - //When invalid LSN is requested, it means "don't wait, return latest version of the page" - //This is necessary for bootstrap. + async fn wait_lsn(&self, mut lsn: Lsn) -> anyhow::Result { + // When invalid LSN is requested, it means "don't wait, return latest version of the page" + // This is necessary for bootstrap. if lsn == Lsn(0) { - lsn = Lsn::from(self.last_valid_lsn.load(Ordering::Acquire)); + let last_valid_lsn = self.last_valid_lsn.load(); trace!( "walreceiver doesn't work yet last_valid_lsn {}, requested {}", - self.last_valid_lsn.load(Ordering::Acquire), + last_valid_lsn, lsn ); + lsn = last_valid_lsn; } self.seqwait_lsn @@ -981,9 +984,9 @@ pub fn get_stats() -> PageCacheStats { num_page_images: 0, num_wal_records: 0, num_getpage_requests: 0, - first_valid_lsn: 0, - last_valid_lsn: 0, - last_record_lsn: 0, + first_valid_lsn: Lsn(0), + last_valid_lsn: Lsn(0), + last_record_lsn: Lsn(0), }; pcaches.iter().for_each(|(_sys_id, pcache)| { diff --git a/pageserver/src/tui.rs b/pageserver/src/tui.rs index 46cd76b9cea5..188ee54e117c 100644 --- a/pageserver/src/tui.rs +++ b/pageserver/src/tui.rs @@ -248,13 +248,6 @@ fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { ]) } -// FIXME: We really should define a datatype for LSNs, with Display trait and -// helper functions. There's one in tokio-postgres, but I don't think we want -// to rely on that. -fn format_lsn(lsn: u64) -> String { - return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff); -} - impl tui::widgets::Widget for MetricsWidget { fn render(self, area: Rect, buf: &mut Buffer) { let block = Block::default() @@ -270,10 +263,9 @@ impl tui::widgets::Widget for MetricsWidget { let page_cache_stats = crate::page_cache::get_stats(); let lsnrange = format!( "{} - {}", - format_lsn(page_cache_stats.first_valid_lsn), - format_lsn(page_cache_stats.last_valid_lsn) + page_cache_stats.first_valid_lsn, page_cache_stats.last_valid_lsn ); - let last_valid_recordlsn_str = format_lsn(page_cache_stats.last_record_lsn); + let last_valid_recordlsn_str = page_cache_stats.last_record_lsn.to_string(); lines.push(get_metric_str("Valid LSN range", &lsnrange)); lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str)); lines.push(get_metric_u64( diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index a1f48ddb84d5..e16fb678a3a3 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -4,6 +4,7 @@ use std::fmt; use std::ops::{Add, AddAssign}; use std::path::Path; use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr #[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] @@ -108,6 +109,41 @@ impl AddAssign for Lsn { } } +/// An [`Lsn`] that can be accessed atomically. +pub struct AtomicLsn { + inner: AtomicU64, +} + +impl AtomicLsn { + /// Creates a new atomic `Lsn`. + pub fn new(val: u64) -> Self { + AtomicLsn { + inner: AtomicU64::new(val), + } + } + + /// Atomically retrieve the `Lsn` value from memory. + pub fn load(&self) -> Lsn { + Lsn(self.inner.load(Ordering::Acquire)) + } + + /// Atomically store a new `Lsn` value to memory. + pub fn store(&self, lsn: Lsn) { + self.inner.store(lsn.0, Ordering::Release); + } + + /// Adds to the current value, returning the previous value. + /// + /// This operation will panic on overflow. + pub fn fetch_add(&self, val: u64) -> Lsn { + let prev = self.inner.fetch_add(val, Ordering::AcqRel); + if prev.checked_add(val).is_none() { + panic!("AtomicLsn overflow"); + } + Lsn(prev) + } +} + #[cfg(test)] mod tests { use super::*; From 74594214a5400e264338b53600c61009e4866a6f Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sun, 25 Apr 2021 10:43:06 -0700 Subject: [PATCH 6/8] Drop LSNs from PageCacheStats There's no clear way to sum LSNs across timelines, so just remove them for now. --- pageserver/src/page_cache.rs | 27 +++++---------------------- pageserver/src/tui.rs | 10 +++++++++- 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index 55e4491c4c34..667b16fb9062 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -39,7 +39,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use lazy_static::lazy_static; use log::*; use rocksdb; -use std::cmp::{max, min}; +use std::cmp::min; use std::collections::HashMap; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; @@ -84,25 +84,14 @@ pub struct PageCacheStats { pub num_page_images: u64, pub num_wal_records: u64, pub num_getpage_requests: u64, - pub first_valid_lsn: Lsn, - pub last_valid_lsn: Lsn, - pub last_record_lsn: Lsn, } impl AddAssign for PageCacheStats { fn add_assign(&mut self, other: Self) { - *self = Self { - num_entries: self.num_entries + other.num_entries, - num_page_images: self.num_page_images + other.num_page_images, - num_wal_records: self.num_wal_records + other.num_wal_records, - num_getpage_requests: self.num_getpage_requests + other.num_getpage_requests, - - // FIXME: needs review - // What should be happening here? I'm not sure what is the desired result. - first_valid_lsn: min(self.first_valid_lsn, other.first_valid_lsn), - last_valid_lsn: max(self.last_valid_lsn, other.last_valid_lsn), - last_record_lsn: max(self.last_record_lsn, other.last_record_lsn), - } + self.num_entries += other.num_entries; + self.num_page_images += other.num_page_images; + self.num_wal_records += other.num_wal_records; + self.num_getpage_requests += other.num_getpage_requests; } } @@ -769,9 +758,6 @@ impl PageCache { num_page_images: self.num_page_images.load(Ordering::Relaxed), num_wal_records: self.num_wal_records.load(Ordering::Relaxed), num_getpage_requests: self.num_getpage_requests.load(Ordering::Relaxed), - first_valid_lsn: self.first_valid_lsn.load(), - last_valid_lsn: self.last_valid_lsn.load(), - last_record_lsn: self.last_record_lsn.load(), } } @@ -984,9 +970,6 @@ pub fn get_stats() -> PageCacheStats { num_page_images: 0, num_wal_records: 0, num_getpage_requests: 0, - first_valid_lsn: Lsn(0), - last_valid_lsn: Lsn(0), - last_record_lsn: Lsn(0), }; pcaches.iter().for_each(|(_sys_id, pcache)| { diff --git a/pageserver/src/tui.rs b/pageserver/src/tui.rs index 188ee54e117c..c0e1d657b38b 100644 --- a/pageserver/src/tui.rs +++ b/pageserver/src/tui.rs @@ -240,7 +240,9 @@ fn get_metric_u64(title: &str, value: u64) -> Spans { ]) } -fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { +// This is not used since LSNs were removed from page cache stats. +// Maybe it will be used in the future? +fn _get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> { Spans::from(vec![ Span::styled(format!("{:<20}", title), Style::default()), Span::raw(": "), @@ -261,6 +263,10 @@ impl tui::widgets::Widget for MetricsWidget { let mut lines: Vec = Vec::new(); let page_cache_stats = crate::page_cache::get_stats(); + + // This is not used since LSNs were removed from page cache stats. + // Maybe it will be used in the future? + /* let lsnrange = format!( "{} - {}", page_cache_stats.first_valid_lsn, page_cache_stats.last_valid_lsn @@ -268,6 +274,8 @@ impl tui::widgets::Widget for MetricsWidget { let last_valid_recordlsn_str = page_cache_stats.last_record_lsn.to_string(); lines.push(get_metric_str("Valid LSN range", &lsnrange)); lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str)); + */ + lines.push(get_metric_u64( "# of cache entries", page_cache_stats.num_entries, From b4ec229a038d70e383c74b4be933205802d85986 Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sun, 25 Apr 2021 10:57:33 -0700 Subject: [PATCH 7/8] add Lsn::block_offset, remaining_in_block, calc_padding Replace open-coded math with member fns. --- pageserver/src/waldecoder.rs | 20 ++++++-------------- pageserver/src/walreceiver.rs | 5 +---- zenith_utils/src/lsn.rs | 29 +++++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/pageserver/src/waldecoder.rs b/pageserver/src/waldecoder.rs index 29cd8109ded4..33e16ca336ea 100644 --- a/pageserver/src/waldecoder.rs +++ b/pageserver/src/waldecoder.rs @@ -6,8 +6,6 @@ use std::str; use thiserror::Error; use zenith_utils::lsn::Lsn; -const XLOG_BLCKSZ: u32 = 8192; - // FIXME: this is configurable in PostgreSQL, 16 MB is the default const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024; @@ -111,10 +109,7 @@ impl WalStreamDecoder { self.lsn += SizeOfXLogLongPHD as u64; continue; - } else if self.lsn.0 % (XLOG_BLCKSZ as u64) == 0 { - // FIXME: make this a member of Lsn, but what should it be called? - // parse page header - + } else if self.lsn.block_offset() == 0 { if self.inputbuf.remaining() < SizeOfXLogShortPHD { return Ok(None); } @@ -165,8 +160,7 @@ impl WalStreamDecoder { continue; } else { // we're continuing a record, possibly from previous page. - // FIXME: Should any of this math be captured into Lsn or a related type? - let pageleft: u32 = XLOG_BLCKSZ - (self.lsn.0 % (XLOG_BLCKSZ as u64)) as u32; + let pageleft = self.lsn.remaining_in_block() as u32; // read the rest of the record, or as much as fits on this page. let n = min(self.contlen, pageleft) as usize; @@ -188,12 +182,10 @@ impl WalStreamDecoder { // to the next WAL segment. if is_xlog_switch_record(&recordbuf) { trace!("saw xlog switch record at {}", self.lsn); - self.padlen = (WAL_SEGMENT_SIZE - (self.lsn.0 % WAL_SEGMENT_SIZE)) as u32; - } - - // FIXME: what does this code do? - if self.lsn.0 % 8 != 0 { - self.padlen = 8 - (self.lsn.0 % 8) as u32; + self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32; + } else { + // Pad to an 8-byte boundary + self.padlen = self.lsn.calc_padding(8u32) as u32; } let result = (self.lsn, recordbuf); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 18d7aa842e81..5ef5f1cf02e1 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -166,10 +166,7 @@ async fn walreceiver_main( // FIXME: It probably would be better to always start streaming from the beginning // of the page, or the segment, so that we could check the page/segment headers // too. Just for the sake of paranoia. - // FIXME: should any of this logic move inside the Lsn type? - if startpoint.0 % 8 != 0 { - startpoint += 8 - (startpoint.0 % 8); - } + startpoint += startpoint.calc_padding(8u32); } debug!( "last_valid_lsn {} starting replication from {} for timeline {}, server is at {}...", diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index e16fb678a3a3..e4fb3dc0184d 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -6,6 +6,9 @@ use std::path::Path; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; +/// Transaction log block size in bytes +pub const XLOG_BLCKSZ: u32 = 8192; + /// A Postgres LSN (Log Sequence Number), also known as an XLogRecPtr #[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] pub struct Lsn(pub u64); @@ -54,6 +57,32 @@ impl Lsn { pub fn segment_number(self, seg_sz: u64) -> u64 { self.0 / seg_sz } + + /// Compute the offset into a block + pub fn block_offset(self) -> u64 { + const BLCKSZ: u64 = XLOG_BLCKSZ as u64; + self.0 % BLCKSZ + } + + /// Compute the bytes remaining in this block + /// + /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`. + pub fn remaining_in_block(self) -> u64 { + const BLCKSZ: u64 = XLOG_BLCKSZ as u64; + BLCKSZ - (self.0 % BLCKSZ) + } + + /// Compute the bytes remaining to fill a chunk of some size + /// + /// If the LSN is already at the chunk boundary, it will return 0. + pub fn calc_padding>(self, sz: T) -> u64 { + let sz: u64 = sz.into(); + // By using wrapping_sub, we can subtract first and then mod second. + // If it's done the other way around, then we would return a full + // chunk size if we're already at the chunk boundary. + // (Regular subtraction will panic on overflow in debug builds.) + (sz.wrapping_sub(self.0)) % sz + } } impl From for Lsn { From dc9e4730933d7ba044f3f14b5520d7e8aecd123e Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sun, 25 Apr 2021 11:31:13 -0700 Subject: [PATCH 8/8] add test cases for Lsn math and AtomicLsn --- zenith_utils/src/lsn.rs | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/zenith_utils/src/lsn.rs b/zenith_utils/src/lsn.rs index e4fb3dc0184d..fa8bc6a54542 100644 --- a/zenith_utils/src/lsn.rs +++ b/zenith_utils/src/lsn.rs @@ -196,4 +196,44 @@ mod tests { assert_eq!(Lsn::from_hex("0"), Ok(Lsn(0))); assert_eq!(Lsn::from_hex("F12345678AAAA5555"), Err(LsnParseError)); } + + #[test] + fn test_lsn_math() { + assert_eq!(Lsn(1234) + 11u64, Lsn(1245)); + + assert_eq!( + { + let mut lsn = Lsn(1234); + lsn += 11u64; + lsn + }, + Lsn(1245) + ); + + assert_eq!(Lsn(1234).checked_sub(1233u64), Some(Lsn(1))); + assert_eq!(Lsn(1234).checked_sub(1235u64), None); + + let seg_sz = 16u64 * 1024 * 1024; + assert_eq!(Lsn(0x1000007).segment_offset(seg_sz), 7u64); + assert_eq!(Lsn(0x1000007).segment_number(seg_sz), 1u64); + + assert_eq!(Lsn(0x4007).block_offset(), 7u64); + assert_eq!(Lsn(0x4000).block_offset(), 0u64); + assert_eq!(Lsn(0x4007).remaining_in_block(), 8185u64); + assert_eq!(Lsn(0x4000).remaining_in_block(), 8192u64); + + assert_eq!(Lsn(0xffff01).calc_padding(seg_sz), 255u64); + assert_eq!(Lsn(0x2000000).calc_padding(seg_sz), 0u64); + assert_eq!(Lsn(0xffff01).calc_padding(8u32), 7u64); + assert_eq!(Lsn(0xffff00).calc_padding(8u32), 0u64); + } + + #[test] + fn test_atomic_lsn() { + let lsn = AtomicLsn::new(0); + assert_eq!(lsn.fetch_add(1234), Lsn(0)); + assert_eq!(lsn.load(), Lsn(1234)); + lsn.store(Lsn(5678)); + assert_eq!(lsn.load(), Lsn(5678)); + } }