Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Lsn type #64

Merged
merged 8 commits into from
Apr 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 67 additions & 83 deletions pageserver/src/page_cache.rs

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use tokio::runtime;
use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio::task;
use zenith_utils::lsn::Lsn;

use crate::basebackup;
use crate::page_cache;
Expand Down Expand Up @@ -84,7 +85,7 @@ struct ZenithRequest {
relnode: u32,
forknum: u8,
blkno: u32,
lsn: u64,
lsn: Lsn,
}

#[derive(Debug)]
Expand Down Expand Up @@ -373,7 +374,7 @@ impl FeMessage {
relnode: body.get_u32(),
forknum: body.get_u8(),
blkno: body.get_u32(),
lsn: body.get_u64(),
lsn: Lsn::from(body.get_u64()),
};

// TODO: consider using protobuf or serde bincode for less error prone
Expand Down
33 changes: 16 additions & 17 deletions pageserver/src/restore_local_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use crate::ZTimelineId;
use postgres_ffi::xlog_utils::*;
use zenith_utils::lsn::Lsn;

// From pg_tablespace_d.h
//
Expand All @@ -60,20 +61,21 @@ pub fn restore_timeline(
.join(timeline.to_string())
.join("snapshots");

let mut last_snapshot_lsn: u64 = 0;
let mut last_snapshot_lsn: Lsn = Lsn(0);

for direntry in fs::read_dir(&snapshotspath).unwrap() {
let direntry = direntry?;
let filename = direntry.file_name().to_str().unwrap().to_owned();

let lsn = u64::from_str_radix(&filename, 16)?;
let filename = direntry.file_name();
let lsn = Lsn::from_filename(&filename)?;
last_snapshot_lsn = max(lsn, last_snapshot_lsn);

restore_snapshot(conf, pcache, timeline, &filename)?;
info!("restored snapshot at {}", filename);
// FIXME: pass filename as Path instead of str?
let filename_str = filename.into_string().unwrap();
restore_snapshot(conf, pcache, timeline, &filename_str)?;
info!("restored snapshot at {:?}", filename_str);
}

if last_snapshot_lsn == 0 {
if last_snapshot_lsn == Lsn(0) {
error!(
"could not find valid snapshot in {}",
snapshotspath.display()
Expand Down Expand Up @@ -183,7 +185,7 @@ fn restore_relfile(
dboid: u32,
path: &Path,
) -> Result<()> {
let lsn = u64::from_str_radix(snapshot, 16)?;
let lsn = Lsn::from_hex(snapshot)?;

// Does it look like a relation file?

Expand Down Expand Up @@ -245,15 +247,16 @@ fn restore_wal(
_conf: &PageServerConf,
pcache: &PageCache,
timeline: ZTimelineId,
startpoint: u64,
startpoint: Lsn,
) -> Result<()> {
let walpath = format!("timelines/{}/wal", timeline);

let mut waldecoder = WalStreamDecoder::new(startpoint);

let mut segno = XLByteToSeg(startpoint, 16 * 1024 * 1024);
let mut offset = XLogSegmentOffset(startpoint, 16 * 1024 * 1024);
let mut last_lsn = 0;
const SEG_SIZE: u64 = 16 * 1024 * 1024;
let mut segno = startpoint.segment_number(SEG_SIZE);
let mut offset = startpoint.segment_offset(SEG_SIZE);
let mut last_lsn = Lsn(0);
loop {
// FIXME: assume postgresql tli 1 for now
let filename = XLogFileName(1, segno, 16 * 1024 * 1024);
Expand Down Expand Up @@ -336,11 +339,7 @@ fn restore_wal(
segno += 1;
offset = 0;
}
info!(
"reached end of WAL at {:X}/{:X}",
last_lsn >> 32,
last_lsn & 0xffffffff
);
info!("reached end of WAL at {}", last_lsn);

Ok(())
}
Expand Down
22 changes: 11 additions & 11 deletions pageserver/src/tui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,16 @@ fn get_metric_u64(title: &str, value: u64) -> Spans {
])
}

fn get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> {
// This is not used since LSNs were removed from page cache stats.
// Maybe it will be used in the future?
fn _get_metric_str<'a>(title: &str, value: &'a str) -> Spans<'a> {
Spans::from(vec![
Span::styled(format!("{:<20}", title), Style::default()),
Span::raw(": "),
Span::styled(value, Style::default().add_modifier(Modifier::BOLD)),
])
}

// FIXME: We really should define a datatype for LSNs, with Display trait and
// helper functions. There's one in tokio-postgres, but I don't think we want
// to rely on that.
fn format_lsn(lsn: u64) -> String {
return format!("{:X}/{:X}", lsn >> 32, lsn & 0xffff_ffff);
}

impl tui::widgets::Widget for MetricsWidget {
fn render(self, area: Rect, buf: &mut Buffer) {
let block = Block::default()
Expand All @@ -268,14 +263,19 @@ impl tui::widgets::Widget for MetricsWidget {
let mut lines: Vec<Spans> = Vec::new();

let page_cache_stats = crate::page_cache::get_stats();

// This is not used since LSNs were removed from page cache stats.
// Maybe it will be used in the future?
/*
let lsnrange = format!(
"{} - {}",
format_lsn(page_cache_stats.first_valid_lsn),
format_lsn(page_cache_stats.last_valid_lsn)
page_cache_stats.first_valid_lsn, page_cache_stats.last_valid_lsn
);
let last_valid_recordlsn_str = format_lsn(page_cache_stats.last_record_lsn);
let last_valid_recordlsn_str = page_cache_stats.last_record_lsn.to_string();
lines.push(get_metric_str("Valid LSN range", &lsnrange));
lines.push(get_metric_str("Last record LSN", &last_valid_recordlsn_str));
*/

lines.push(get_metric_u64(
"# of cache entries",
page_cache_stats.num_entries,
Expand Down
42 changes: 17 additions & 25 deletions pageserver/src/waldecoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use log::*;
use std::cmp::min;
use std::str;
use thiserror::Error;

const XLOG_BLCKSZ: u32 = 8192;
use zenith_utils::lsn::Lsn;

// FIXME: this is configurable in PostgreSQL, 16 MB is the default
const WAL_SEGMENT_SIZE: u64 = 16 * 1024 * 1024;
Expand Down Expand Up @@ -41,9 +40,9 @@ const SizeOfXLogLongPHD: usize = (2 + 2 + 4 + 8 + 4) + 4 + 8 + 4 + 4;

#[allow(dead_code)]
pub struct WalStreamDecoder {
lsn: u64,
lsn: Lsn,

startlsn: u64, // LSN where this record starts
startlsn: Lsn, // LSN where this record starts
contlen: u32,
padlen: u32,

Expand All @@ -56,19 +55,19 @@ pub struct WalStreamDecoder {
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
lsn: u64,
lsn: Lsn,
}

//
// WalRecordStream is a Stream that returns a stream of WAL records
// FIXME: This isn't a proper rust stream
//
impl WalStreamDecoder {
pub fn new(lsn: u64) -> WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
WalStreamDecoder {
lsn,

startlsn: 0,
startlsn: Lsn(0),
contlen: 0,
padlen: 0,

Expand All @@ -89,18 +88,18 @@ impl WalStreamDecoder {
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
/// Err(WalDecodeError): an error occured while decoding, meaning the input was invalid.
///
pub fn poll_decode(&mut self) -> Result<Option<(u64, Bytes)>, WalDecodeError> {
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
loop {
// parse and verify page boundaries as we go
if self.lsn % WAL_SEGMENT_SIZE == 0 {
if self.lsn.segment_offset(WAL_SEGMENT_SIZE) == 0 {
// parse long header

if self.inputbuf.remaining() < SizeOfXLogLongPHD {
return Ok(None);
}

let hdr = self.decode_XLogLongPageHeaderData();
if hdr.std.xlp_pageaddr != self.lsn {
if hdr.std.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog segment header".into(),
lsn: self.lsn,
Expand All @@ -110,15 +109,13 @@ impl WalStreamDecoder {

self.lsn += SizeOfXLogLongPHD as u64;
continue;
} else if self.lsn % (XLOG_BLCKSZ as u64) == 0 {
// parse page header

} else if self.lsn.block_offset() == 0 {
if self.inputbuf.remaining() < SizeOfXLogShortPHD {
return Ok(None);
}

let hdr = self.decode_XLogPageHeaderData();
if hdr.xlp_pageaddr != self.lsn {
if hdr.xlp_pageaddr != self.lsn.0 {
return Err(WalDecodeError {
msg: "invalid xlog page header".into(),
lsn: self.lsn,
Expand Down Expand Up @@ -163,7 +160,7 @@ impl WalStreamDecoder {
continue;
} else {
// we're continuing a record, possibly from previous page.
let pageleft: u32 = XLOG_BLCKSZ - (self.lsn % (XLOG_BLCKSZ as u64)) as u32;
let pageleft = self.lsn.remaining_in_block() as u32;

// read the rest of the record, or as much as fits on this page.
let n = min(self.contlen, pageleft) as usize;
Expand All @@ -184,16 +181,11 @@ impl WalStreamDecoder {
// XLOG_SWITCH records are special. If we see one, we need to skip
// to the next WAL segment.
if is_xlog_switch_record(&recordbuf) {
trace!(
"saw xlog switch record at {:X}/{:X}",
(self.lsn >> 32),
self.lsn & 0xffffffff
);
self.padlen = (WAL_SEGMENT_SIZE - (self.lsn % WAL_SEGMENT_SIZE)) as u32;
}

if self.lsn % 8 != 0 {
self.padlen = 8 - (self.lsn % 8) as u32;
trace!("saw xlog switch record at {}", self.lsn);
self.padlen = self.lsn.calc_padding(WAL_SEGMENT_SIZE) as u32;
} else {
// Pad to an 8-byte boundary
self.padlen = self.lsn.calc_padding(8u32) as u32;
}

let result = (self.lsn, recordbuf);
Expand Down
Loading