-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] feat: use buffered fwrite to write binlog instead of Mmap #2778
[WIP] feat: use buffered fwrite to write binlog instead of Mmap #2778
Conversation
WalkthroughThe update enhances the Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant Binlog
participant BufferedWritableFile
participant TimerTaskThread
User->>Binlog: Initialize Binlog with Buffer
Binlog->>BufferedWritableFile: Create BufferedWritableFile
Binlog->>TimerTaskThread: Start Timer Task for Flush
loop Periodically
TimerTaskThread->>Binlog: Invoke FlushBufferedFile
Binlog->>BufferedWritableFile: Flush Data to Disk
end
User->>Binlog: Write Data
Binlog->>BufferedWritableFile: Append Data to Buffer
User->>Binlog: Destroy Binlog
Binlog->>TimerTaskThread: Stop Timer Task
Binlog->>BufferedWritableFile: Final Flush and Close
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (5)
- include/pika_binlog.h (3 hunks)
- include/pika_define.h (1 hunks)
- src/pika_binlog.cc (5 hunks)
- src/pstd/include/env.h (2 hunks)
- src/pstd/src/env.cc (3 hunks)
Additional comments not posted (21)
include/pika_binlog.h (3)
16-16
: Header inclusion looks good.The inclusion of the
dispatch_thread.h
header is appropriate for supporting theTimerTaskThread
class.
80-80
: Method declaration looks good.The
FlushBufferedFile
method declaration is appropriate for theBinlog
class.
113-114
: Member addition looks good.The addition of the
timer_task_thread_
member to theBinlog
class is appropriate for managing periodic tasks.src/pstd/include/env.h (1)
71-76
: Function additions look good.The functions
NewBufferedWritableFile
andBufferedAppendableFile
are appropriate for handling buffered writable files with specified buffer sizes and offsets.include/pika_define.h (1)
35-35
: Constant addition looks good.The addition of the constant
FWRITE_USER_SPACE_BUF_SIZE
with a value of 4KB is appropriate for defining the buffer size for buffered writable files.src/pika_binlog.cc (8)
84-84
: Buffered writable file creation looks good.The use of
NewBufferedWritableFile
to create a new buffered writable file with the specified buffer size is appropriate and aligns with the PR objectives.
115-115
: Buffered appendable file creation looks good.The use of
BufferedAppendableFile
to create a buffered appendable file with the specified buffer size and offset is appropriate and aligns with the PR objectives.
125-127
: Periodic flush timer task addition looks good.The addition of a timer task to flush the buffered file every 500 milliseconds is appropriate and aligns with the PR objectives.
132-132
: Stopping the timer task thread in the destructor looks good.Calling
StopThread
on thetimer_task_thread_
member in theBinlog
destructor ensures proper cleanup.
136-144
: FlushBufferedFile method implementation looks good.The
FlushBufferedFile
method locks the mutex, checks if the file is open, and flushes the buffer if it is, ensuring thread safety and proper flushing.
227-227
: Buffered writable file creation during log file rollover looks good.The use of
NewBufferedWritableFile
to create a new buffered writable file with the specified buffer size during log file rollover is appropriate and aligns with the PR objectives.
404-404
: Buffered writable file creation in SetProducerStatus method looks good.The use of
NewBufferedWritableFile
to create a new buffered writable file with the specified buffer size in theSetProducerStatus
method is appropriate and aligns with the PR objectives.
443-443
: Buffered appendable file creation in Truncate method looks good.The use of
BufferedAppendableFile
to create a buffered appendable file with the specified buffer size and offset in theTruncate
method is appropriate and aligns with the PR objectives.src/pstd/src/env.cc (8)
494-501
: Destructor should ensure resources are released.The destructor should ensure that the file is closed and the buffer is freed.
535-541
: Handle fclose error properly in Close method.The
Close
method should handle the case wherefclose
fails and ensure the file pointer is set tonullptr
.
543-548
: Ensure proper handling of fflush errors in Flush method.The
Flush
method should handle the case wherefflush
fails and return an appropriate error status.
550-560
: Ensure proper handling of fsync errors in Sync method.The
Sync
method should handle the case wherefsync
fails and return an appropriate error status.
562-564
: Trim method implementation is currently a no-op.The
Trim
method is currently a no-op. If trimming functionality is required, implement the method; otherwise, document why it's a no-op.
564-565
: Filesize method looks good.The
Filesize
method correctly returns the current file size.
743-785
: Handle file errors and buffer allocation failures properly.The function correctly handles file errors and buffer allocation failures. Ensure that resources are properly released in case of errors.
787-824
: Handle file errors and buffer allocation failures properly.The function correctly handles file errors and buffer allocation failures. Ensure that resources are properly released in case of errors.
Status Append(const Slice& data) override { | ||
if (!file_) { | ||
return IOError("fwrite target: " + filename_ + " is not opened", errno); | ||
} | ||
const char* src = data.data(); | ||
size_t left = data.size(); | ||
int32_t max_retries = 4; | ||
int retry_count = 0; | ||
|
||
while (left > 0) { | ||
size_t written = fwrite(src, sizeof(char), left, file_); | ||
if (written == 0) { | ||
if (ferror(file_)) { | ||
int err_num = errno; | ||
clearerr(file_); | ||
return IOError("fwrite error with " + filename_, err_num); | ||
} | ||
if (errno == ENOSPC || ++retry_count > max_retries) { | ||
return IOError(filename_, errno); | ||
} | ||
} | ||
src += written; | ||
left -= written; | ||
curr_file_size_ += written; | ||
retry_count = 0; | ||
} | ||
|
||
return Status::OK(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle retry logic and error conditions in Append method.
The Append
method includes retry logic but does not handle the case where fwrite
fails due to reasons other than ENOSPC
. Consider adding a retry limit and handling other error scenarios.
Status Append(const Slice& data) override {
if (!file_) {
return IOError("fwrite target: " + filename_ + " is not opened", errno);
}
const char* src = data.data();
size_t left = data.size();
int32_t max_retries = 4;
int retry_count = 0;
while (left > 0) {
size_t written = fwrite(src, sizeof(char), left, file_);
if (written == 0) {
if (ferror(file_)) {
int err_num = errno;
clearerr(file_);
return IOError("fwrite error with " + filename_, err_num);
}
if (errno == ENOSPC || ++retry_count > max_retries) {
return IOError(filename_, errno);
}
}
src += written;
left -= written;
curr_file_size_ += written;
retry_count = 0;
}
return Status::OK();
}
Committable suggestion was skipped due to low confidence.
class BufferedWritableFile : public WritableFile { | ||
private: | ||
std::string filename_; | ||
FILE* file_; | ||
int32_t user_space_buf_size_; | ||
uint64_t curr_file_size_; | ||
char* buffer_; | ||
|
||
public: | ||
BufferedWritableFile() = delete; | ||
BufferedWritableFile(const BufferedWritableFile&) = delete; | ||
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; | ||
|
||
// caller must ensure passing-in 'file' is not nullptr | ||
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) | ||
: filename_(std::move(file_name)), | ||
file_(file), | ||
user_space_buf_size_(user_space_buf_size), | ||
curr_file_size_(curr_file_size), | ||
buffer_(buf) { | ||
assert(file_ && "file_ can not be nullptr"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure proper buffer management in the constructor.
The constructor takes ownership of the buffer but does not handle the case where setvbuf
fails after buffer allocation.
- BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
+ BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf)
: filename_(std::move(file_name)),
file_(file),
user_space_buf_size_(user_space_buf_size),
curr_file_size_(curr_file_size),
buffer_(buf) {
assert(file_ && "file_ can not be nullptr");
if (user_space_buf_size_ > 0 && setvbuf(file_, buffer_, _IOFBF, user_space_buf_size_) != 0) {
free(buffer_);
buffer_ = nullptr;
user_space_buf_size_ = 0;
// Handle error appropriately, e.g., throw an exception or set an error status
}
}
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
class BufferedWritableFile : public WritableFile { | |
private: | |
std::string filename_; | |
FILE* file_; | |
int32_t user_space_buf_size_; | |
uint64_t curr_file_size_; | |
char* buffer_; | |
public: | |
BufferedWritableFile() = delete; | |
BufferedWritableFile(const BufferedWritableFile&) = delete; | |
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; | |
// caller must ensure passing-in 'file' is not nullptr | |
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) | |
: filename_(std::move(file_name)), | |
file_(file), | |
user_space_buf_size_(user_space_buf_size), | |
curr_file_size_(curr_file_size), | |
buffer_(buf) { | |
assert(file_ && "file_ can not be nullptr"); | |
} | |
class BufferedWritableFile : public WritableFile { | |
private: | |
std::string filename_; | |
FILE* file_; | |
int32_t user_space_buf_size_; | |
uint64_t curr_file_size_; | |
char* buffer_; | |
public: | |
BufferedWritableFile() = delete; | |
BufferedWritableFile(const BufferedWritableFile&) = delete; | |
BufferedWritableFile& operator=(const BufferedWritableFile&) = delete; | |
// caller must ensure passing-in 'file' is not nullptr | |
BufferedWritableFile(std::string file_name, FILE* file, int32_t user_space_buf_size, int32_t curr_file_size, char* buf) | |
: filename_(std::move(file_name)), | |
file_(file), | |
user_space_buf_size_(user_space_buf_size), | |
curr_file_size_(curr_file_size), | |
buffer_(buf) { | |
assert(file_ && "file_ can not be nullptr"); | |
if (user_space_buf_size_ > 0 && setvbuf(file_, buffer_, _IOFBF, user_space_buf_size_) != 0) { | |
free(buffer_); | |
buffer_ = nullptr; | |
user_space_buf_size_ = 0; | |
// Handle error appropriately, e.g., throw an exception or set an error status | |
} | |
} |
目前增加了定时任务,每500ms主动flush一次。调研一下这种使用方式。 |
Currently, a scheduled task has been added, which actively flushes every 500ms. Research this usage. |
2 remove auto Flush() for every write
db8848f
to
009952c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- include/pika_define.h (1 hunks)
- src/pika_binlog.cc (6 hunks)
Additional comments not posted (9)
include/pika_define.h (1)
35-35
: LGTM!The new constant
FWRITE_USER_SPACE_BUF_SIZE
is correctly defined as512LL << 10
(512KB).src/pika_binlog.cc (8)
84-84
: LGTM!The use of
pstd::NewBufferedWritableFile
with the buffer sizeFWRITE_USER_SPACE_BUF_SIZE
is correct.
115-115
: LGTM!The use of
pstd::BufferedAppendableFile
with the buffer sizeFWRITE_USER_SPACE_BUF_SIZE
is correct.
124-127
: LGTM!The addition of the timer task to flush the buffered file every 500 milliseconds is correctly implemented.
132-132
: LGTM!The timer task thread is correctly stopped in the destructor to prevent resource leaks.
136-144
: LGTM!The
FlushBufferedFile
method is correctly implemented and thread-safe.
227-227
: LGTM!The use of
pstd::NewBufferedWritableFile
with the buffer sizeFWRITE_USER_SPACE_BUF_SIZE
in thePut
method is correct.
280-282
: Verify the commenting out of the flush operation.The flush operation in the
EmitPhysicalRecord
method is commented out. Ensure this change is intentional and does not affect functionality.
404-404
: LGTM!The use of
pstd::NewBufferedWritableFile
with the buffer sizeFWRITE_USER_SPACE_BUF_SIZE
in theSetProducerStatus
method is correct.
因为需要将崩溃场景以及无锁优化考虑进来,所以重写了方案,更换了PR:#2848