Skip to content

Commit

Permalink
safekeeper: add wal_last_modified to debug_dump.
Browse files Browse the repository at this point in the history
Adds to debug_dump option to include highest modified time among all WAL
segments. In passing replace some str with OsStr to have less unwraps.
  • Loading branch information
arssher committed Sep 13, 2024
1 parent fcab61b commit e98dbe9
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 33 deletions.
34 changes: 25 additions & 9 deletions libs/postgres_ffi/src/xlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use bytes::{Buf, Bytes};
use log::*;

use serde::Serialize;
use std::ffi::OsStr;
use std::fs::File;
use std::io::prelude::*;
use std::io::ErrorKind;
Expand Down Expand Up @@ -78,19 +79,34 @@ pub fn XLogFileName(tli: TimeLineID, logSegNo: XLogSegNo, wal_segsz_bytes: usize
)
}

pub fn XLogFromFileName(fname: &str, wal_seg_size: usize) -> (XLogSegNo, TimeLineID) {
let tli = u32::from_str_radix(&fname[0..8], 16).unwrap();
let log = u32::from_str_radix(&fname[8..16], 16).unwrap() as XLogSegNo;
let seg = u32::from_str_radix(&fname[16..24], 16).unwrap() as XLogSegNo;
(log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli)
pub fn XLogFromFileName(
fname: &OsStr,
wal_seg_size: usize,
) -> anyhow::Result<(XLogSegNo, TimeLineID)> {
if let Some(fname_str) = fname.to_str() {
let tli = u32::from_str_radix(&fname_str[0..8], 16)?;
let log = u32::from_str_radix(&fname_str[8..16], 16)? as XLogSegNo;
let seg = u32::from_str_radix(&fname_str[16..24], 16)? as XLogSegNo;
Ok((log * XLogSegmentsPerXLogId(wal_seg_size) + seg, tli))
} else {
anyhow::bail!("non-ut8 filename: {:?}", fname);
}
}

pub fn IsXLogFileName(fname: &str) -> bool {
return fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit());
pub fn IsXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.len() == XLOG_FNAME_LEN && fname.chars().all(|c| c.is_ascii_hexdigit())
} else {
false
}
}

pub fn IsPartialXLogFileName(fname: &str) -> bool {
fname.ends_with(".partial") && IsXLogFileName(&fname[0..fname.len() - 8])
pub fn IsPartialXLogFileName(fname: &OsStr) -> bool {
if let Some(fname) = fname.to_str() {
fname.ends_with(".partial") && IsXLogFileName(OsStr::new(&fname[0..fname.len() - 8]))
} else {
false
}
}

/// If LSN points to the beginning of the page, then shift it to first record,
Expand Down
5 changes: 3 additions & 2 deletions libs/postgres_ffi/wal_craft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -136,8 +137,8 @@ impl Conf {

pub fn pg_waldump(
&self,
first_segment_name: &str,
last_segment_name: &str,
first_segment_name: &OsStr,
last_segment_name: &OsStr,
) -> anyhow::Result<std::process::Output> {
let first_segment_file = self.datadir.join(first_segment_name);
let last_segment_file = self.datadir.join(last_segment_name);
Expand Down
19 changes: 11 additions & 8 deletions libs/postgres_ffi/wal_craft/src/xlog_utils_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::*;
use crate::{error, info};
use regex::Regex;
use std::cmp::min;
use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::Write;
use std::{env, str::FromStr};
Expand Down Expand Up @@ -54,7 +55,7 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
.wal_dir()
.read_dir()
.unwrap()
.map(|f| f.unwrap().file_name().into_string().unwrap())
.map(|f| f.unwrap().file_name())
.filter(|fname| IsXLogFileName(fname))
.max()
.unwrap();
Expand All @@ -70,11 +71,11 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
start_lsn
);
for file in fs::read_dir(cfg.wal_dir()).unwrap().flatten() {
let fname = file.file_name().into_string().unwrap();
let fname = file.file_name();
if !IsXLogFileName(&fname) {
continue;
}
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE);
let (segno, _) = XLogFromFileName(&fname, WAL_SEGMENT_SIZE).unwrap();
let seg_start_lsn = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
if seg_start_lsn > u64::from(*start_lsn) {
continue;
Expand All @@ -93,10 +94,10 @@ fn test_end_of_wal<C: crate::Crafter>(test_name: &str) {
}
}

fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {
fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &OsStr) -> Lsn {
// Get the actual end of WAL by pg_waldump
let waldump_output = cfg
.pg_waldump("000000010000000000000001", last_segment)
.pg_waldump(OsStr::new("000000010000000000000001"), last_segment)
.unwrap()
.stderr;
let waldump_output = std::str::from_utf8(&waldump_output).unwrap();
Expand All @@ -117,7 +118,7 @@ fn find_pg_waldump_end_of_wal(cfg: &crate::Conf, last_segment: &str) -> Lsn {

fn check_end_of_wal(
cfg: &crate::Conf,
last_segment: &str,
last_segment: &OsStr,
start_lsn: Lsn,
expected_end_of_wal: Lsn,
) {
Expand All @@ -132,7 +133,8 @@ fn check_end_of_wal(
// Rename file to partial to actually find last valid lsn, then rename it back.
fs::rename(
cfg.wal_dir().join(last_segment),
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
)
.unwrap();
let wal_end = find_end_of_wal(&cfg.wal_dir(), WAL_SEGMENT_SIZE, start_lsn).unwrap();
Expand All @@ -142,7 +144,8 @@ fn check_end_of_wal(
);
assert_eq!(wal_end, expected_end_of_wal);
fs::rename(
cfg.wal_dir().join(format!("{}.partial", last_segment)),
cfg.wal_dir()
.join(format!("{}.partial", last_segment.to_str().unwrap())),
cfg.wal_dir().join(last_segment),
)
.unwrap();
Expand Down
33 changes: 33 additions & 0 deletions safekeeper/src/debug_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use postgres_ffi::MAX_SEND_SIZE;
use serde::Deserialize;
use serde::Serialize;

use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName};
use sha2::{Digest, Sha256};
use utils::id::NodeId;
use utils::id::TenantTimelineId;
Expand Down Expand Up @@ -51,6 +52,9 @@ pub struct Args {
/// Dump full term history. True by default.
pub dump_term_history: bool,

/// Dump last modified time of WAL segments. Uses value of `dump_all` by default.
pub dump_wal_last_modified: bool,

/// Filter timelines by tenant_id.
pub tenant_id: Option<TenantId>,

Expand Down Expand Up @@ -128,12 +132,19 @@ async fn build_from_tli_dump(
None
};

let wal_last_modified = if args.dump_wal_last_modified {
get_wal_last_modified(timeline_dir).ok().flatten()
} else {
None
};

Timeline {
tenant_id: timeline.ttid.tenant_id,
timeline_id: timeline.ttid.timeline_id,
control_file,
memory,
disk_content,
wal_last_modified,
}
}

Expand All @@ -156,6 +167,7 @@ pub struct Timeline {
pub control_file: Option<TimelinePersistentState>,
pub memory: Option<Memory>,
pub disk_content: Option<DiskContent>,
pub wal_last_modified: Option<DateTime<Utc>>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -302,6 +314,27 @@ fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
})
}

/// Get highest modified time of WAL segments in the directory.
fn get_wal_last_modified(path: &Utf8Path) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for entry in fs::read_dir(path)? {
if entry.is_err() {
continue;
}
let entry = entry?;
/* Ignore files that are not XLOG segments */
let fname = entry.file_name();
if !IsXLogFileName(&fname) && !IsPartialXLogFileName(&fname) {
continue;
}

let metadata = entry.metadata()?;
let modified: DateTime<Utc> = DateTime::from(metadata.modified()?);
res = std::cmp::max(res, Some(modified));
}
Ok(res)
}

/// Converts SafeKeeperConf to Config, filtering out the fields that are not
/// supposed to be exposed.
fn build_config(config: SafeKeeperConf) -> Config {
Expand Down
4 changes: 4 additions & 0 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let mut dump_memory: Option<bool> = None;
let mut dump_disk_content: Option<bool> = None;
let mut dump_term_history: Option<bool> = None;
let mut dump_wal_last_modified: Option<bool> = None;
let mut tenant_id: Option<TenantId> = None;
let mut timeline_id: Option<TimelineId> = None;

Expand All @@ -494,6 +495,7 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
"dump_memory" => dump_memory = Some(parse_kv_str(&k, &v)?),
"dump_disk_content" => dump_disk_content = Some(parse_kv_str(&k, &v)?),
"dump_term_history" => dump_term_history = Some(parse_kv_str(&k, &v)?),
"dump_wal_last_modified" => dump_wal_last_modified = Some(parse_kv_str(&k, &v)?),
"tenant_id" => tenant_id = Some(parse_kv_str(&k, &v)?),
"timeline_id" => timeline_id = Some(parse_kv_str(&k, &v)?),
_ => Err(ApiError::BadRequest(anyhow::anyhow!(
Expand All @@ -508,13 +510,15 @@ async fn dump_debug_handler(mut request: Request<Body>) -> Result<Response<Body>
let dump_memory = dump_memory.unwrap_or(dump_all);
let dump_disk_content = dump_disk_content.unwrap_or(dump_all);
let dump_term_history = dump_term_history.unwrap_or(true);
let dump_wal_last_modified = dump_wal_last_modified.unwrap_or(dump_all);

let args = debug_dump::Args {
dump_all,
dump_control_file,
dump_memory,
dump_disk_content,
dump_term_history,
dump_wal_last_modified,
tenant_id,
timeline_id,
};
Expand Down
25 changes: 11 additions & 14 deletions safekeeper/src/wal_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,20 +539,17 @@ async fn remove_segments_from_disk(
while let Some(entry) = entries.next_entry().await? {
let entry_path = entry.path();
let fname = entry_path.file_name().unwrap();

if let Some(fname_str) = fname.to_str() {
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
continue;
}
let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
if remove_predicate(segno) {
remove_file(entry_path).await?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
REMOVED_WAL_SEGMENTS.inc();
}
/* Ignore files that are not XLOG segments */
if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
continue;
}
let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
if remove_predicate(segno) {
remove_file(entry_path).await?;
n_removed += 1;
min_removed = min(min_removed, segno);
max_removed = max(max_removed, segno);
REMOVED_WAL_SEGMENTS.inc();
}
}

Expand Down
1 change: 1 addition & 0 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
log.info(f"debug_dump before reboot {debug_dump_0}")
assert debug_dump_0["timelines_count"] == 1
assert debug_dump_0["timelines"][0]["timeline_id"] == str(timeline_id)
assert debug_dump_0["timelines"][0]["wal_last_modified"] != ""

endpoint.safe_psql("create table t(i int)")

Expand Down

0 comments on commit e98dbe9

Please sign in to comment.