Skip to content

Commit

Permalink
Merge pull request #18598 from redpanda-data/stephan/truncate-plus-al…
Browse files Browse the repository at this point in the history
…locate

segment_appender: truncate in addition to allocate
  • Loading branch information
piyushredpanda authored Jun 7, 2024
2 parents 18fa688 + 7d0d283 commit 4b8135e
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/go/rpk/pkg/cli/debug/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func NewCommand(fs afero.Fs, p *config.Params) *cobra.Command {
f.StringVar(&logsSince, "logs-since", "", "Include log entries on or newer than the specified date (journalctl date format, e.g. YYYY-MM-DD")
f.StringVar(&logsUntil, "logs-until", "", "Include log entries on or older than the specified date (journalctl date format, e.g. YYYY-MM-DD")
f.StringVar(&logsSizeLimit, "logs-size-limit", "100MiB", "Read the logs until the given size is reached (e.g. 3MB, 1GiB)")
f.StringVar(&controllerLogsSizeLimit, "controller-logs-size-limit", "20MB", "The size limit of the controller logs that can be stored in the bundle (e.g. 3MB, 1GiB)")
f.StringVar(&controllerLogsSizeLimit, "controller-logs-size-limit", "132MB", "The size limit of the controller logs that can be stored in the bundle (e.g. 3MB, 1GiB)")
f.StringVar(&uploadURL, "upload-url", "", "If provided, where to upload the bundle in addition to creating a copy on disk")
f.StringVarP(&namespace, "namespace", "n", "redpanda", "The namespace to use to collect the resources from (k8s only)")
f.StringArrayVarP(&labelSelector, "label-selector", "l", []string{"app.kubernetes.io/name=redpanda"}, "Comma-separated label selectors to filter your resources. e.g: <label>=<value>,<label>=<value> (k8s only)")
Expand Down
7 changes: 7 additions & 0 deletions src/v/storage/segment_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,13 @@ ss::future<> segment_appender::do_next_adaptive_fallocation() {
_fallocation_offset,
_committed_offset);
return _out.allocate(_fallocation_offset, step)
.then([this, step] {
// ss::file::allocate does not adjust logical file size
// hence we need to do that explicitly with an extra
// truncate. This allows for more efficient writes.
// https://github.com/redpanda-data/redpanda/pull/18598.
return _out.truncate(_fallocation_offset + step);
})
.then([this, step] { _fallocation_offset += step; });
})
.handle_exception([this](std::exception_ptr e) {
Expand Down
45 changes: 27 additions & 18 deletions src/v/storage/tests/log_retention_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,16 @@ FIXTURE_TEST(retention_test_size_time, gc_fixture) {
offset += model::offset(num_records);
}

BOOST_CHECK_LT(
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().reclaim.retention
- 2_MiB),
20_KiB);
BOOST_CHECK_LT(
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().usage.total()
- 2_MiB),
20_KiB);
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());

// second segment
builder | storage::add_segment(offset);
Expand All @@ -159,14 +161,16 @@ FIXTURE_TEST(retention_test_size_time, gc_fixture) {
}

// the first segment is now eligible for reclaim
BOOST_CHECK_LT(
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().reclaim.retention
- 3_MiB),
20_KiB);
BOOST_CHECK_LT(
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().usage.total()
- 3_MiB),
20_KiB);
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());

// third segment
builder | storage::add_segment(offset);
Expand All @@ -185,14 +189,16 @@ FIXTURE_TEST(retention_test_size_time, gc_fixture) {
}

// the first,second segment is now eligible for reclaim
BOOST_CHECK_LT(
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().reclaim.retention
- 4_MiB),
20_KiB);
BOOST_CHECK_LT(
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().usage.total()
- 4_MiB),
20_KiB);
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());

// active segment
builder | storage::add_segment(offset);
Expand All @@ -211,14 +217,16 @@ FIXTURE_TEST(retention_test_size_time, gc_fixture) {
}

// the first,second segment is now eligible for reclaim
BOOST_CHECK_LT(
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().reclaim.retention
- 5_MiB),
20_KiB);
BOOST_CHECK_LT(
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().usage.total()
- 5_MiB),
20_KiB);
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());

builder | storage::garbage_collect(model::timestamp::now(), 4_MiB);

Expand Down Expand Up @@ -436,10 +444,11 @@ FIXTURE_TEST(non_collectible_disk_usage_test, gc_fixture) {
BOOST_CHECK_EQUAL(
builder.disk_usage(model::timestamp::now(), 0).get().reclaim.retention,
0);
BOOST_CHECK_LT(
BOOST_CHECK_LE(
(builder.disk_usage(model::timestamp::now(), 0).get().usage.total()
- 3_MiB),
20_KiB);
builder.storage().resources().get_falloc_step({})
* builder.get_disk_log_impl().segments().size());

builder | storage::stop();
}
8 changes: 6 additions & 2 deletions src/v/storage/tests/log_segment_appender_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,10 @@ SEASTAR_THREAD_TEST_CASE(test_can_append_little_data) {
}

static void run_test_fallocate_size(size_t fallocate_size) {
auto f = open_file("test_segment_appender.log");
auto filename = "test_segment_appender.log";
auto f = open_file(filename);
storage::storage_resources resources(
config::mock_binding<size_t>(std::move(fallocate_size)));
config::mock_binding<size_t>(fallocate_size));
auto appender = make_segment_appender(f, resources);
auto close = ss::defer([&appender] { appender.close().get(); });

Expand Down Expand Up @@ -602,6 +603,9 @@ static void run_test_fallocate_size(size_t fallocate_size) {
BOOST_CHECK_EQUAL(original, result);
in.close().get();
}

// test that logical file size got updated as well (truncate called)
BOOST_CHECK_EQUAL(ss::file_size(filename).get() % fallocate_size, 0);
}

SEASTAR_THREAD_TEST_CASE(test_fallocate_size) {
Expand Down
107 changes: 103 additions & 4 deletions tests/rptest/remote_scripts/compute_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,49 @@
from pathlib import Path
import sys
import json
import io
import struct
import collections
import hashlib
import subprocess
from typing import Iterator


# NB: SegmentReader is duplicated in si_utils.py for deployment reasons. If
# making changes please adapt both.
class SegmentReader:
HDR_FMT_RP = "<IiqbIhiqqqhii"
HEADER_SIZE = struct.calcsize(HDR_FMT_RP)
Header = collections.namedtuple(
'Header', ('header_crc', 'batch_size', 'base_offset', 'type', 'crc',
'attrs', 'delta', 'first_ts', 'max_ts', 'producer_id',
'producer_epoch', 'base_seq', 'record_count'))

def __init__(self, stream):
self.stream = stream

def read_batch(self):
data = self.stream.read(self.HEADER_SIZE)
if len(data) == self.HEADER_SIZE:
header = self.Header(*struct.unpack(self.HDR_FMT_RP, data))
if all(map(lambda v: v == 0, header)):
return None
records_size = header.batch_size - self.HEADER_SIZE
data = self.stream.read(records_size)
if len(data) < records_size:
return None
assert len(
data
) == records_size, f"data len is {len(data)} but the expected records size is {records_size}"
return header
return None

def __iter__(self) -> Iterator[Header]:
while True:
it = self.read_batch()
if it is None:
return
yield it


def safe_isdir(p: Path) -> bool:
Expand All @@ -33,7 +76,48 @@ def safe_listdir(p: Path) -> list[Path]:
return []


def compute_size(data_dir: Path, sizes: bool):
def md5_for_bytes(calculate_md5: bool, data: bytes) -> str:
return hashlib.md5(data).hexdigest() if calculate_md5 else ''


def md5_for_filename(calculate_md5: bool, file: Path) -> str:
return subprocess.check_output([
'md5sum', file.absolute()
]).decode('utf-8').split(' ')[0] if calculate_md5 else ''


def compute_size_for_file(file: Path, calc_md5: bool):
file_size = file.stat().st_size
if file.suffix == '.log':
page_size = 4096

# just read segments for small files
if file_size < 4 * page_size:
data = file.read_bytes()
reader = SegmentReader(io.BytesIO(data))
return md5_for_bytes(calc_md5,
data), sum(h.batch_size for h in reader)
else:
# if the large page is not a null page this is a properly closed and
# truncated segment and hence we can just use filesize otherwise
# compute the size of the segment
with file.open('rb') as f:
f.seek(-page_size, io.SEEK_END)
end_page = f.read(page_size)
if end_page != b'\x00' * page_size:
return md5_for_filename(calc_md5, file), file_size

f.seek(0)
data = f.read()
reader = SegmentReader(io.BytesIO(data))
return md5_for_bytes(calc_md5,
data), sum(h.batch_size for h in reader)
else:
return md5_for_filename(calc_md5, file), file_size


def compute_size(data_dir: Path, sizes: bool, calculate_md5: bool,
print_flat: bool):
output = {}
for ns in safe_listdir(data_dir):
if not safe_isdir(ns):
Expand All @@ -49,7 +133,13 @@ def compute_size(data_dir: Path, sizes: bool):
seg_output = {}
if sizes:
try:
seg_output["size"] = segment.stat().st_size
md5, size = compute_size_for_file(
segment, calculate_md5)
seg_output["size"] = size
if calculate_md5:
seg_output["md5"] = md5
if print_flat:
print(f"{segment.absolute()} {size} {md5}")
except FileNotFoundError:
# It's valid to have a segment deleted
# at anytime
Expand All @@ -71,8 +161,17 @@ def compute_size(data_dir: Path, sizes: bool):
parser.add_argument('--sizes',
action="store_true",
help='Also compute sizes of files')
parser.add_argument('--md5',
action="store_true",
help='Also compute md5 checksums')
parser.add_argument(
'--print-flat',
action="store_true",
help='Print output for each file instead of returning as json')
args = parser.parse_args()

data_dir = Path(args.data_dir)
assert data_dir.exists(), f"{data_dir} must exist"
output = compute_size(data_dir, args.sizes)
json.dump(output, sys.stdout)
output = compute_size(data_dir, args.sizes, args.md5, args.print_flat)
if not args.print_flat:
json.dump(output, sys.stdout)
5 changes: 3 additions & 2 deletions tests/rptest/services/redpanda.py
Original file line number Diff line number Diff line change
Expand Up @@ -4289,7 +4289,8 @@ def data_checksum(self, node: ClusterNode) -> FileToChecksumSize:
"""Run command that computes MD5 hash of every file in redpanda data
directory. The results of the command are turned into a map from path
to hash-size tuples."""
cmd = f"find {RedpandaService.DATA_DIR} -type f -exec md5sum -z '{{}}' \; -exec stat -c ' %s' '{{}}' \;"
script_path = inject_remote_script(node, "compute_storage.py")
cmd = f"python3 {script_path} --sizes --md5 --print-flat --data-dir {RedpandaService.DATA_DIR}"
lines = node.account.ssh_output(cmd, timeout_sec=120)
lines = lines.decode().split("\n")

Expand All @@ -4310,7 +4311,7 @@ def data_checksum(self, node: ClusterNode) -> FileToChecksumSize:
lines.pop()

return {
tokens[1].rstrip("\x00"): (tokens[0], int(tokens[2]))
tokens[0]: (tokens[2], int(tokens[1]))
for tokens in map(lambda l: l.split(), lines)
}

Expand Down
3 changes: 2 additions & 1 deletion tests/rptest/tests/full_disk_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ def test_full_disk_triggers_gc(self):

def observed_data_size(pred):
observed = self.redpanda.data_stat(node)
observed_total = sum(s for _, s in observed)
observed_total = sum(s for path, s in observed
if path.parts[0] == 'kafka')
return pred(observed_total)

# write around 30 megabytes into the topic
Expand Down
2 changes: 2 additions & 0 deletions tests/rptest/utils/si_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ class SegmentSummary(NamedTuple):
size_bytes: int


# NB: SegmentReader is duplicated compute_storage.py for deployment reasons. If
# making changes please adapt both.
class SegmentReader:
HDR_FMT_RP = "<IiqbIhiqqqhii"
HEADER_SIZE = struct.calcsize(HDR_FMT_RP)
Expand Down

0 comments on commit 4b8135e

Please sign in to comment.