diff --git a/libbeat/publisher/includes/includes.go b/libbeat/publisher/includes/includes.go index e6f3ded0bee..a14dd16d3ba 100644 --- a/libbeat/publisher/includes/includes.go +++ b/libbeat/publisher/includes/includes.go @@ -27,6 +27,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/kafka" _ "github.com/elastic/beats/v7/libbeat/outputs/logstash" _ "github.com/elastic/beats/v7/libbeat/outputs/redis" + _ "github.com/elastic/beats/v7/libbeat/publisher/queue/diskqueue" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" _ "github.com/elastic/beats/v7/libbeat/publisher/queue/spool" ) diff --git a/libbeat/publisher/queue/diskqueue/acks.go b/libbeat/publisher/queue/diskqueue/acks.go new file mode 100644 index 00000000000..ed9d7589db2 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/acks.go @@ -0,0 +1,146 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "os" + "sync" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// queuePosition represents a logical position within the queue buffer. +type queuePosition struct { + segmentID segmentID + offset segmentOffset +} + +type diskQueueACKs struct { + logger *logp.Logger + + // This lock must be held to access diskQueueACKs fields (except for + // diskQueueACKs.done, which is always safe). + lock sync.Mutex + + // The id and position of the first unacknowledged frame. + nextFrameID frameID + nextPosition queuePosition + + // If a frame has been ACKed, then frameSize[frameID] contains its size on + // disk. The size is used to track the queuePosition of the oldest + // remaining frame, which is written to disk as ACKs are received. (We do + // this to avoid duplicating events if the beat terminates without a clean + // shutdown.) + frameSize map[frameID]uint64 + + // segmentBoundaries maps the first frameID of each segment to its + // corresponding segment ID. + segmentBoundaries map[frameID]segmentID + + // When a segment has been completely acknowledged by a consumer, it sends + // the segment ID to this channel, where it is read by the core loop and + // scheduled for deletion. + segmentACKChan chan segmentID + + // An open writable file handle to the file that stores the queue position. + // This position is advanced as we receive ACKs, confirming it is safe + // to move forward, so the acking code is responsible for updating this + // file. + positionFile *os.File + + // When the queue is closed, diskQueueACKs.done is closed to signal that + // the core loop will not accept any more acked segments and any future + // ACKs should be ignored. + done chan struct{} +} + +func newDiskQueueACKs( + logger *logp.Logger, position queuePosition, positionFile *os.File, +) *diskQueueACKs { + return &diskQueueACKs{ + logger: logger, + nextFrameID: 0, + nextPosition: position, + frameSize: make(map[frameID]uint64), + segmentBoundaries: make(map[frameID]segmentID), + segmentACKChan: make(chan segmentID), + positionFile: positionFile, + done: make(chan struct{}), + } +} + +func (dqa *diskQueueACKs) addFrames(frames []*readFrame) { + dqa.lock.Lock() + defer dqa.lock.Unlock() + select { + case <-dqa.done: + // We are already done and should ignore any leftover ACKs we receive. + return + default: + } + for _, frame := range frames { + segment := frame.segment + if frame.id != 0 && frame.id == segment.firstFrameID { + // This is the first frame in its segment, mark it so we know when + // we're starting a new segment. + // + // Subtlety: we don't count the very first frame as a "boundary" even + // though it is the first frame we read from its segment. This prevents + // us from resetting our segment offset to zero, in case the initial + // offset was restored from a previous session instead of starting at + // the beginning of the first file. + dqa.segmentBoundaries[frame.id] = segment.id + } + dqa.frameSize[frame.id] = frame.bytesOnDisk + } + oldSegmentID := dqa.nextPosition.segmentID + if dqa.frameSize[dqa.nextFrameID] != 0 { + for ; dqa.frameSize[dqa.nextFrameID] != 0; dqa.nextFrameID++ { + newSegment, ok := dqa.segmentBoundaries[dqa.nextFrameID] + if ok { + // This is the start of a new segment. Remove this frame from the + // segment boundary list and set the position to the start of the + // new segment. + delete(dqa.segmentBoundaries, dqa.nextFrameID) + dqa.nextPosition = queuePosition{ + segmentID: newSegment, + offset: 0, + } + } + dqa.nextPosition.offset += segmentOffset(dqa.frameSize[dqa.nextFrameID]) + delete(dqa.frameSize, dqa.nextFrameID) + } + // We advanced the ACK position at least somewhat, so write its + // new value. + err := writeQueuePositionToHandle(dqa.positionFile, dqa.nextPosition) + if err != nil { + // TODO: Don't spam this warning on every ACK if it's a permanent error. + dqa.logger.Warnf("Couldn't save queue position: %v", err) + } + } + if oldSegmentID != dqa.nextPosition.segmentID { + // We crossed at least one segment boundary, inform the listener that + // everything before the current segment has been acknowledged (but bail + // out if our done channel has been closed, since that means there is no + // listener on the other end.) + select { + case dqa.segmentACKChan <- dqa.nextPosition.segmentID - 1: + case <-dqa.done: + } + } +} diff --git a/libbeat/publisher/queue/diskqueue/checksum.go b/libbeat/publisher/queue/diskqueue/checksum.go new file mode 100644 index 00000000000..87cdb7b1aef --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/checksum.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "encoding/binary" + "hash/crc32" +) + +// Computes the checksum that should be written / read in a frame footer +// based on the raw content of that frame (excluding header / footer). +func computeChecksum(data []byte) uint32 { + hash := crc32.NewIEEE() + frameLength := uint32(len(data) + frameMetadataSize) + binary.Write(hash, binary.LittleEndian, &frameLength) + hash.Write(data) + return hash.Sum32() +} diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go new file mode 100644 index 00000000000..f39f608361d --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -0,0 +1,158 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "errors" + "fmt" + "path/filepath" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/cfgtype" + "github.com/elastic/beats/v7/libbeat/paths" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +// Settings contains the configuration fields to create a new disk queue +// or open an existing one. +type Settings struct { + // The path on disk of the queue's containing directory, which will be + // created if it doesn't exist. Within the directory, the queue's state + // is stored in state.dat and each segment's data is stored in + // {segmentIndex}.seg + // If blank, the default directory is "diskqueue" within the beat's data + // directory. + Path string + + // MaxBufferSize is the maximum number of bytes that the queue should + // ever occupy on disk. A value of 0 means the queue can grow until the + // disk is full (this is not recommended on a primary system disk). + MaxBufferSize uint64 + + // MaxSegmentSize is the maximum number of bytes that should be written + // to a single segment file before creating a new one. + MaxSegmentSize uint64 + + // How many events will be read from disk while waiting for a consumer + // request. + ReadAheadLimit int + + // How many events will be queued in memory waiting to be written to disk. + // This setting should rarely matter in practice, but if data is coming + // in faster than it can be written to disk for an extended period, + // this limit can keep it from overflowing memory. + WriteAheadLimit int + + // A listener that should be sent ACKs when an event is successfully + // written to disk. + WriteToDiskListener queue.ACKListener +} + +// userConfig holds the parameters for a disk queue that are configurable +// by the end user in the beats yml file. +type userConfig struct { + Path string `config:"path"` + MaxSize cfgtype.ByteSize `config:"max_size" validate:"required"` + SegmentSize *cfgtype.ByteSize `config:"segment_size"` + ReadAheadLimit *int `config:"read_ahead"` + WriteAheadLimit *int `config:"write_ahead"` +} + +func (c *userConfig) Validate() error { + // If the segment size is explicitly specified, the total queue size must + // be at least twice as large. + if c.SegmentSize != nil && c.MaxSize != 0 && c.MaxSize < *c.SegmentSize*2 { + return errors.New( + "Disk queue max_size must be at least twice as big as segment_size") + } + + // We require a total queue size of at least 10MB, and a segment size of + // at least 1MB. The queue can support lower thresholds, but it will perform + // terribly, so we give an explicit error in that case. + // These bounds are still extremely low for Beats ingestion, but if all you + // need is for a low-volume stream on a tiny device to persist between + // restarts, it will work fine. + if c.MaxSize != 0 && c.MaxSize < 10*1000*1000 { + return fmt.Errorf( + "Disk queue max_size (%d) cannot be less than 10MB", c.MaxSize) + } + if c.SegmentSize != nil && *c.SegmentSize < 1000*1000 { + return fmt.Errorf( + "Disk queue segment_size (%d) cannot be less than 1MB", *c.SegmentSize) + } + + return nil +} + +// DefaultSettings returns a Settings object with reasonable default values +// for all important fields. +func DefaultSettings() Settings { + return Settings{ + MaxSegmentSize: 100 * (1 << 20), // 100MiB + MaxBufferSize: (1 << 30), // 1GiB + + ReadAheadLimit: 256, + WriteAheadLimit: 1024, + } +} + +// SettingsForUserConfig returns a Settings struct initialized with the +// end-user-configurable settings in the given config tree. +func SettingsForUserConfig(config *common.Config) (Settings, error) { + userConfig := userConfig{} + if err := config.Unpack(&userConfig); err != nil { + return Settings{}, fmt.Errorf("parsing user config: %w", err) + } + settings := DefaultSettings() + settings.Path = userConfig.Path + + settings.MaxBufferSize = uint64(userConfig.MaxSize) + if userConfig.SegmentSize != nil { + settings.MaxSegmentSize = uint64(*userConfig.SegmentSize) + } else { + // If no value is specified, default segment size is total queue size + // divided by 10. + settings.MaxSegmentSize = uint64(userConfig.MaxSize) / 10 + } + return settings, nil +} + +// +// bookkeeping helpers +// + +func (settings Settings) directoryPath() string { + if settings.Path == "" { + return paths.Resolve(paths.Data, "diskqueue") + } + return settings.Path +} + +func (settings Settings) stateFilePath() string { + return filepath.Join(settings.directoryPath(), "state.dat") +} + +func (settings Settings) segmentPath(segmentID segmentID) string { + return filepath.Join( + settings.directoryPath(), + fmt.Sprintf("%v.seg", segmentID)) +} + +func (settings Settings) maxSegmentOffset() segmentOffset { + return segmentOffset(settings.MaxSegmentSize - segmentHeaderSize) +} diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go new file mode 100644 index 00000000000..b2922778ea5 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -0,0 +1,114 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +type diskQueueConsumer struct { + queue *diskQueue + closed bool +} + +type diskQueueBatch struct { + queue *diskQueue + frames []*readFrame +} + +// +// diskQueueConsumer implementation of the queue.Consumer interface +// + +func (consumer *diskQueueConsumer) Get(eventCount int) (queue.Batch, error) { + if consumer.closed { + return nil, fmt.Errorf("Tried to read from a closed disk queue consumer") + } + + // Read at least one frame. This is guaranteed to eventually + // succeed unless the queue is closed. + frame, ok := <-consumer.queue.readerLoop.output + if !ok { + return nil, fmt.Errorf("Tried to read from a closed disk queue") + } + frames := []*readFrame{frame} +eventLoop: + for eventCount <= 0 || len(frames) < eventCount { + select { + case frame, ok := <-consumer.queue.readerLoop.output: + if !ok { + // The queue was closed while we were reading it, just send back + // what we have so far. + break eventLoop + } + frames = append(frames, frame) + default: + // We can't read any more frames without blocking, so send back + // what we have now. + break eventLoop + } + } + + // There is a mild race condition here based on queue closure: events + // written to readerLoop.output may have been buffered before the + // queue was closed, and we may be reading its leftovers afterwards. + // We could try to detect this case here by checking the + // consumer.queue.done channel, and return nothing if it's been closed. + // But this gives rise to another race: maybe the queue was + // closed _after_ we read those frames, and we _ought_ to return them + // to the reader. The queue interface doesn't specify the proper + // behavior in this case. + // + // Lacking formal requirements, we elect to be permissive: if we have + // managed to read frames, then the queue already knows and considers them + // "read," so we lose no consistency by returning them. If someone closes + // the queue while we are draining the channel, nothing changes functionally + // except that any ACKs after that point will be ignored. A well-behaved + // Beats shutdown will always ACK / close its consumers before closing the + // queue itself, so we expect this corner case not to arise in practice, but + // if it does it is innocuous. + + return &diskQueueBatch{ + queue: consumer.queue, + frames: frames, + }, nil +} + +func (consumer *diskQueueConsumer) Close() error { + consumer.closed = true + return nil +} + +// +// diskQueueBatch implementation of the queue.Batch interface +// + +func (batch *diskQueueBatch) Events() []publisher.Event { + events := make([]publisher.Event, len(batch.frames)) + for i, frame := range batch.frames { + events[i] = frame.event + } + return events +} + +func (batch *diskQueueBatch) ACK() { + batch.queue.acks.addFrames(batch.frames) +} diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go new file mode 100644 index 00000000000..56a50b5a422 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -0,0 +1,449 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import "fmt" + +// This file contains the queue's "core loop" -- the central goroutine +// that owns all queue state that is not encapsulated in one of the +// self-contained helper loops. This is the only file that is allowed to +// modify the queue state after its creation, and it contains the full +// logical "state transition diagram" for queue operation. + +func (dq *diskQueue) run() { + // Wake up the reader and deleter loops if there are segments to process + // from a previous instantiation of the queue. + dq.maybeReadPending() + dq.maybeDeleteACKed() + + for { + select { + // Endpoints used by the producer / consumer API implementation. + case producerWriteRequest := <-dq.producerWriteRequestChan: + dq.handleProducerWriteRequest(producerWriteRequest) + + // After a write request, there may be data ready to send to the + // writer loop. + dq.maybeWritePending() + + case ackedSegmentID := <-dq.acks.segmentACKChan: + dq.handleSegmentACK(ackedSegmentID) + + // After receiving new ACKs, a segment might be ready to delete. + dq.maybeDeleteACKed() + + case <-dq.done: + dq.handleShutdown() + return + + // Writer loop handling + case writerLoopResponse := <-dq.writerLoop.responseChan: + dq.handleWriterLoopResponse(writerLoopResponse) + + // The writer loop completed a request, so check if there is more + // data to be sent. + dq.maybeWritePending() + // We also check whether the reader loop is waiting for the data + // that was just written. + dq.maybeReadPending() + + // Reader loop handling + case readerLoopResponse := <-dq.readerLoop.responseChan: + dq.handleReaderLoopResponse(readerLoopResponse) + + // If there is more data to read, start a new read request. + dq.maybeReadPending() + + // Deleter loop handling + case deleterLoopResponse := <-dq.deleterLoop.responseChan: + dq.handleDeleterLoopResponse(deleterLoopResponse) + + // If there are still files waiting to be deleted, send another request. + dq.maybeDeleteACKed() + + // If there were blocked producers waiting for more queue space, + // we might be able to unblock them now. + dq.maybeUnblockProducers() + } + } +} + +func (dq *diskQueue) handleProducerWriteRequest(request producerWriteRequest) { + // Pathological case checking: make sure the incoming frame isn't bigger + // than an entire segment all by itself (as long as it isn't, it is + // guaranteed to eventually enter the queue assuming no disk errors). + frameSize := request.frame.sizeOnDisk() + if dq.settings.MaxSegmentSize < frameSize { + dq.logger.Warnf( + "Rejecting event with size %v because the maximum segment size is %v", + frameSize, dq.settings.MaxSegmentSize) + request.responseChan <- false + return + } + + // If no one else is blocked waiting for queue capacity, and there is + // enough space, then we add the new frame and report success. + // Otherwise, we either add to the end of blockedProducers to wait for + // the requested space or report immediate failure, depending on the + // producer settings. + if len(dq.blockedProducers) == 0 && dq.canAcceptFrameOfSize(frameSize) { + // There is enough space for the new frame! Add it to the + // pending list and report success, then dispatch it to the + // writer loop if no other requests are outstanding. + dq.enqueueWriteFrame(request.frame) + request.responseChan <- true + } else { + // The queue is too full. Either add the request to blockedProducers, + // or send an immediate reject. + if request.shouldBlock { + dq.blockedProducers = append(dq.blockedProducers, request) + } else { + request.responseChan <- false + } + } +} + +func (dq *diskQueue) handleWriterLoopResponse(response writerLoopResponse) { + dq.writing = false + + // The writer loop response contains the number of bytes written to + // each segment that appeared in the request. Entries always appear in + // the same sequence as (the beginning of) segments.writing. + for index, bytesWritten := range response.bytesWritten { + // Update the segment with its new size. + dq.segments.writing[index].endOffset += segmentOffset(bytesWritten) + } + + // If there is more than one segment in the response, then all but the + // last have been closed and are ready to move to the reading list. + closedCount := len(response.bytesWritten) - 1 + if closedCount > 0 { + // Remove the prefix of the writing array and append to to reading. + closedSegments := dq.segments.writing[:closedCount] + dq.segments.writing = dq.segments.writing[closedCount:] + dq.segments.reading = + append(dq.segments.reading, closedSegments...) + } +} + +func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) { + dq.reading = false + + // Advance the frame / offset based on what was just completed. + dq.segments.nextReadFrameID += frameID(response.frameCount) + dq.segments.nextReadOffset += segmentOffset(response.byteCount) + + var segment *queueSegment + if len(dq.segments.reading) > 0 { + // A segment is finished if we have read all the data, or + // the read response reports an error. + // Segments in the reading list have been completely written, + // so we can rely on their endOffset field to determine their size. + segment = dq.segments.reading[0] + if dq.segments.nextReadOffset >= segment.endOffset || response.err != nil { + dq.segments.reading = dq.segments.reading[1:] + dq.segments.acking = append(dq.segments.acking, segment) + dq.segments.nextReadOffset = 0 + } + } else { + // A segment in the writing list can't be finished writing, + // so we don't check the endOffset. + segment = dq.segments.writing[0] + } + segment.framesRead = uint64(dq.segments.nextReadFrameID - segment.firstFrameID) + + // If there was an error, report it. + if response.err != nil { + dq.logger.Errorf( + "Error reading segment file %s: %v", + dq.settings.segmentPath(segment.id), response.err) + } +} + +func (dq *diskQueue) handleDeleterLoopResponse(response deleterLoopResponse) { + dq.deleting = false + newAckedSegments := []*queueSegment{} + errors := []error{} + for i, err := range response.results { + if err != nil { + // This segment had an error, so it stays in the acked list. + newAckedSegments = append(newAckedSegments, dq.segments.acked[i]) + errors = append(errors, + fmt.Errorf("Couldn't delete segment %d: %w", + dq.segments.acked[i].id, err)) + } + } + if len(dq.segments.acked) > len(response.results) { + // Preserve any new acked segments that were added during the deletion + // request. + tail := dq.segments.acked[len(response.results):] + newAckedSegments = append(newAckedSegments, tail...) + } + dq.segments.acked = newAckedSegments + if len(errors) > 0 { + dq.logger.Errorw("Deleting segment files", "errors", errors) + } +} + +func (dq *diskQueue) handleSegmentACK(ackedSegmentID segmentID) { + acking := dq.segments.acking + if len(acking) == 0 { + return + } + ackedSegmentCount := 0 + for ; ackedSegmentCount < len(acking); ackedSegmentCount++ { + if acking[ackedSegmentCount].id > ackedSegmentID { + // This segment has not been acked yet, we're done. + break + } + } + if ackedSegmentCount > 0 { + // Move fully acked segments to the acked list and remove them + // from the acking list. + dq.segments.acked = + append(dq.segments.acked, acking[:ackedSegmentCount]...) + dq.segments.acking = acking[ackedSegmentCount:] + } +} + +func (dq *diskQueue) handleShutdown() { + // Shutdown: first, we wait for any outstanding requests to complete, to + // make sure the helper loops are idle and all state is finalized, then + // we do final cleanup and write our position to disk. + + // Close the reader loop's request channel to signal an abort in case it's + // still processing a request (we don't need any more frames). + // We still wait for acknowledgement afterwards: if there is a request in + // progress, it's possible that a consumer already read and acknowledged + // some of its data, so we want the final metadata before we write our + // closing state. + close(dq.readerLoop.requestChan) + if dq.reading { + response := <-dq.readerLoop.responseChan + dq.handleReaderLoopResponse(response) + } + + // We are assured by our callers within Beats that we will not be sent a + // shutdown signal until all our producers have been finalized / + // shut down -- thus, there should be no writer requests outstanding, and + // writerLoop.requestChan should be idle. But just in case (and in + // particular to handle the case where a request is stuck retrying a fatal + // error), we signal abort by closing the request channel, and read the + // final state if there is any. + close(dq.writerLoop.requestChan) + if dq.writing { + response := <-dq.writerLoop.responseChan + dq.handleWriterLoopResponse(response) + } + + // We let the deleter loop finish its current request, but we don't send + // the abort signal yet, since we might want to do one last deletion + // after checking the final consumer ACK state. + if dq.deleting { + response := <-dq.deleterLoop.responseChan + dq.handleDeleterLoopResponse(response) + } + + // If there are any blocked producers still hoping for space to open up + // in the queue, send them the bad news. + for _, request := range dq.blockedProducers { + request.responseChan <- false + } + dq.blockedProducers = nil + + // The reader and writer loops are now shut down, and the deleter loop is + // idle. The remaining cleanup is in finalizing the read position in the + // queue (the first event that hasn't been acknowledged by consumers), and + // in deleting any older segment files that may be left. + // + // Events read by consumers have been accumulating their ACK data in + // dq.acks. During regular operation the core loop is not allowed to use + // this data, since it requires holding a mutex, but during shutdown we're + // allowed to block to acquire it. However, we still must close its done + // channel first, otherwise the lock may be held by a consumer that is + // blocked trying to send us a message we're no longer listening to... + close(dq.acks.done) + dq.acks.lock.Lock() + finalPosition := dq.acks.nextPosition + // We won't be updating the position anymore, so we can close the file. + dq.acks.positionFile.Sync() + dq.acks.positionFile.Close() + dq.acks.lock.Unlock() + + // First check for the rare and fortunate case that every single event we + // wrote to the queue was ACKed. In this case it is safe to delete + // everything up to and including the current segment. Otherwise, we only + // delete things before the current segment. + if len(dq.segments.writing) > 0 && + finalPosition.segmentID == dq.segments.writing[0].id && + finalPosition.offset >= dq.segments.writing[0].endOffset { + dq.handleSegmentACK(finalPosition.segmentID) + } else if finalPosition.segmentID > 0 { + dq.handleSegmentACK(finalPosition.segmentID - 1) + } + + // Do one last round of deletions, then shut down the deleter loop. + dq.maybeDeleteACKed() + if dq.deleting { + response := <-dq.deleterLoop.responseChan + dq.handleDeleterLoopResponse(response) + } + close(dq.deleterLoop.requestChan) +} + +// If the pendingFrames list is nonempty, and there are no outstanding +// requests to the writer loop, send the next batch of frames. +func (dq *diskQueue) maybeWritePending() { + if dq.writing || len(dq.pendingFrames) == 0 { + // Nothing to do right now + return + } + // Remove everything from pendingFrames and forward it to the writer loop. + frames := dq.pendingFrames + dq.pendingFrames = nil + + dq.writerLoop.requestChan <- writerLoopRequest{ + frames: frames, + } + dq.writing = true +} + +// Returns the active read segment, or nil if there is none. +func (segments *diskQueueSegments) readingSegment() *queueSegment { + if len(segments.reading) > 0 { + return segments.reading[0] + } + if len(segments.writing) > 0 { + return segments.writing[0] + } + return nil +} + +// If the reading list is nonempty, and there are no outstanding read +// requests, send one. +func (dq *diskQueue) maybeReadPending() { + if dq.reading { + // A read request is already pending + return + } + segment := dq.segments.readingSegment() + if segment == nil || + dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) { + // Nothing to read + return + } + if dq.segments.nextReadOffset == 0 { + // If we're reading the beginning of this segment, assign its firstFrameID. + segment.firstFrameID = dq.segments.nextReadFrameID + } + request := readerLoopRequest{ + segment: segment, + startFrameID: dq.segments.nextReadFrameID, + startOffset: dq.segments.nextReadOffset, + endOffset: segment.endOffset, + } + dq.readerLoop.requestChan <- request + dq.reading = true +} + +// If the acked list is nonempty, and there are no outstanding deletion +// requests, send one. +func (dq *diskQueue) maybeDeleteACKed() { + if !dq.deleting && len(dq.segments.acked) > 0 { + dq.deleterLoop.requestChan <- deleterLoopRequest{ + segments: dq.segments.acked} + dq.deleting = true + } +} + +// maybeUnblockProducers checks whether the queue has enough free space +// to accept any of the requests in the blockedProducers list, and if so +// accepts them in order and updates the list. +func (dq *diskQueue) maybeUnblockProducers() { + unblockedCount := 0 + for _, request := range dq.blockedProducers { + if !dq.canAcceptFrameOfSize(request.frame.sizeOnDisk()) { + // Not enough space for this frame, we're done. + break + } + // Add the frame to pendingFrames and report success. + dq.enqueueWriteFrame(request.frame) + request.responseChan <- true + unblockedCount++ + } + if unblockedCount > 0 { + dq.blockedProducers = dq.blockedProducers[unblockedCount:] + } +} + +// enqueueWriteFrame determines which segment an incoming frame should be +// written to and adds the resulting segmentedFrame to pendingFrames. +func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) { + // Start with the most recent writing segment if there is one. + var segment *queueSegment + if len(dq.segments.writing) > 0 { + segment = dq.segments.writing[len(dq.segments.writing)-1] + } + frameLen := segmentOffset(frame.sizeOnDisk()) + // If segment is nil, or the new segment exceeds its bounds, + // we need to create a new writing segment. + if segment == nil || + dq.segments.nextWriteOffset+frameLen > dq.settings.maxSegmentOffset() { + segment = &queueSegment{id: dq.segments.nextID} + dq.segments.writing = append(dq.segments.writing, segment) + dq.segments.nextID++ + dq.segments.nextWriteOffset = 0 + } + + dq.segments.nextWriteOffset += frameLen + dq.pendingFrames = append(dq.pendingFrames, segmentedFrame{ + frame: frame, + segment: segment, + }) +} + +// canAcceptFrameOfSize checks whether there is enough free space in the +// queue (subject to settings.MaxBufferSize) to accept a new frame with +// the given size. Size includes both the serialized data and the frame +// header / footer; the easy way to do this for a writeFrame is to pass +// in frame.sizeOnDisk(). +// Capacity calculations do not include requests in the blockedProducers +// list (that data is owned by its callers and we can't touch it until +// we are ready to respond). That allows this helper to be used both while +// handling producer requests and while deciding whether to unblock +// producers after free capacity increases. +// If we decide to add limits on how many events / bytes can be stored +// in pendingFrames (to avoid unbounded memory use if the input is faster +// than the disk), this is the function to modify. +func (dq *diskQueue) canAcceptFrameOfSize(frameSize uint64) bool { + if dq.settings.MaxBufferSize == 0 { + // Currently we impose no limitations if the queue size is unbounded. + return true + } + + // Compute the current queue size. We accept if there is enough capacity + // left in the queue after accounting for the existing segments and the + // pending writes that were already accepted. + pendingBytes := uint64(0) + for _, request := range dq.pendingFrames { + pendingBytes += request.frame.sizeOnDisk() + } + currentSize := pendingBytes + dq.segments.sizeOnDisk() + + return currentSize+frameSize <= dq.settings.MaxBufferSize +} diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go new file mode 100644 index 00000000000..b5f0d301d15 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -0,0 +1,94 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import "testing" + +func TestProducerWriteRequest(t *testing.T) { + dq := &diskQueue{settings: DefaultSettings()} + frame := &writeFrame{ + serialized: make([]byte, 100), + } + request := producerWriteRequest{ + frame: frame, + shouldBlock: true, + responseChan: make(chan bool, 1), + } + dq.handleProducerWriteRequest(request) + + // The request inserts 100 bytes into an empty queue, so it should succeed. + // We expect: + // - the response channel should contain the value true + // - the frame should be added to pendingFrames and assigned to + // segment 0. + success, ok := <-request.responseChan + if !ok { + t.Error("Expected a response from the producer write request.") + } + if !success { + t.Error("Expected write request to succeed") + } + + if len(dq.pendingFrames) != 1 { + t.Error("Expected 1 pending frame after a write request.") + } + if dq.pendingFrames[0].frame != frame { + t.Error("Expected pendingFrames to contain the new frame.") + } + if dq.pendingFrames[0].segment.id != 0 { + t.Error("Expected new frame to be assigned to segment 0.") + } +} + +func TestHandleWriterLoopResponse(t *testing.T) { + // Initialize the queue with two writing segments only. + dq := &diskQueue{ + settings: DefaultSettings(), + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1}, + {id: 2}, + }, + }, + } + // This response says that the writer loop wrote 200 bytes to the first + // segment and 100 bytes to the second. + dq.handleWriterLoopResponse(writerLoopResponse{ + bytesWritten: []int64{200, 100}, + }) + + // After the response is handled, we expect: + // - Each segment's endOffset should be incremented by the bytes written + // - Segment 1 should be moved to the reading list (because all but the + // last segment in a writer loop response has been closed) + // - Segment 2 should remain in the writing list + if len(dq.segments.reading) != 1 || dq.segments.reading[0].id != 1 { + t.Error("Expected segment 1 to move to the reading list") + } + if len(dq.segments.writing) != 1 || dq.segments.writing[0].id != 2 { + t.Error("Expected segment 2 to remain in the writing list") + } + if dq.segments.reading[0].endOffset != 200 { + t.Errorf("Expected segment 1 endOffset 200, got %d", + dq.segments.reading[0].endOffset) + } + if dq.segments.writing[0].endOffset != 100 { + t.Errorf("Expected segment 2 endOffset 100, got %d", + dq.segments.writing[0].endOffset) + } +} diff --git a/libbeat/publisher/queue/diskqueue/deleter_loop.go b/libbeat/publisher/queue/diskqueue/deleter_loop.go new file mode 100644 index 00000000000..4e685285948 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/deleter_loop.go @@ -0,0 +1,99 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "errors" + "os" + "time" +) + +type deleterLoop struct { + // The settings for the queue that created this loop. + settings Settings + + // When one or more segments are ready to delete, they are sent to + // requestChan. At most one deleteRequest may be outstanding at any time. + requestChan chan deleterLoopRequest + + // When a request has been completely processed, a response is sent on + // responseChan. If at least one deletion was successful, the response + // is sent immediately. Otherwise, the deleter loop delays for + // queueSettings.RetryWriteInterval before returning, so timed retries + // don't have to be handled by the core loop. + responseChan chan deleterLoopResponse +} + +type deleterLoopRequest struct { + segments []*queueSegment +} + +type deleterLoopResponse struct { + results []error +} + +func newDeleterLoop(settings Settings) *deleterLoop { + return &deleterLoop{ + settings: settings, + + requestChan: make(chan deleterLoopRequest, 1), + responseChan: make(chan deleterLoopResponse), + } +} + +func (dl *deleterLoop) run() { + for { + request, ok := <-dl.requestChan + if !ok { + // The channel has been closed, time to shut down. + return + } + results := []error{} + deletedCount := 0 + for _, segment := range request.segments { + path := dl.settings.segmentPath(segment.id) + err := os.Remove(path) + // We ignore errors caused by the file not existing: this shouldn't + // happen, but it is still safe to report it as successfully removed. + if err == nil || errors.Is(err, os.ErrNotExist) { + deletedCount++ + results = append(results, nil) + } else { + results = append(results, err) + } + } + if len(request.segments) > 0 && deletedCount == 0 { + // If we were asked to delete segments but could not delete + // _any_ of them, we haven't made progress. Returning an error + // will log the issue and retry, but in this situation we + // want to delay before retrying. The core loop itself can't + // delay (it can never sleep or block), so we handle the + // delay here, by waiting before sending the result. + // The delay can be interrupted if the request channel is closed, + // indicating queue shutdown. + select { + // TODO: make the retry interval configurable. + case <-time.After(time.Second): + case <-dl.requestChan: + } + } + dl.responseChan <- deleterLoopResponse{ + results: results, + } + } +} diff --git a/libbeat/publisher/queue/diskqueue/frames.go b/libbeat/publisher/queue/diskqueue/frames.go new file mode 100644 index 00000000000..02571a65ce9 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/frames.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import "github.com/elastic/beats/v7/libbeat/publisher" + +// Every data frame read from the queue is assigned a unique sequential +// integer, which is used to keep track of which frames have been +// acknowledged. +// This id is not stable between restarts; the value 0 is always assigned +// to the oldest remaining frame on startup. +type frameID uint64 + +// A data frame created through the producer API and waiting to be +// written to disk. +type writeFrame struct { + // The event, serialized for writing to disk and wrapped in a frame + // header / footer. + serialized []byte + + // The producer that created this frame. This is included in the + // frame structure itself because we may need the producer and / or + // its config at any time up until it has been completely written: + // - While the core loop is tracking frames to send to the writer, + // it may receive a Cancel request, which requires us to know + // the producer / config each frame came from. + // - After the writer loop has finished writing the frame to disk, + // it needs to call the ACK function specified in ProducerConfig. + producer *diskQueueProducer +} + +// A frame that has been read from disk and is waiting to be read / +// acknowledged through the consumer API. +type readFrame struct { + // The segment containing this frame. + segment *queueSegment + + // The id of this frame. + id frameID + + // The event decoded from the data frame. + event publisher.Event + + // How much space this frame occupied on disk (before deserialization), + // including the frame header / footer. + bytesOnDisk uint64 +} + +// Each data frame has a 32-bit length in the header, and a 32-bit checksum +// and a duplicate 32-bit length in the footer. +const frameHeaderSize = 4 +const frameFooterSize = 8 +const frameMetadataSize = frameHeaderSize + frameFooterSize + +func (frame writeFrame) sizeOnDisk() uint64 { + return uint64(len(frame.serialized) + frameMetadataSize) +} diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go new file mode 100644 index 00000000000..f4ff4ef2706 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -0,0 +1,109 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +type diskQueueProducer struct { + // The disk queue that created this producer. + queue *diskQueue + + // The configuration this producer was created with. + config queue.ProducerConfig + + encoder *eventEncoder + + // When a producer is cancelled, cancelled is set to true and the done + // channel is closed. (We could get by with just a done channel, but we + // need to make sure that calling Cancel repeatedly doesn't close an + // already-closed channel, which would panic.) + cancelled bool + done chan struct{} +} + +// A request sent from a producer to the core loop to add a frame to the queue. +type producerWriteRequest struct { + frame *writeFrame + shouldBlock bool + responseChan chan bool +} + +// +// diskQueueProducer implementation of the queue.Producer interface +// + +func (producer *diskQueueProducer) Publish(event publisher.Event) bool { + return producer.publish(event, true) +} + +func (producer *diskQueueProducer) TryPublish(event publisher.Event) bool { + return producer.publish(event, false) +} + +func (producer *diskQueueProducer) publish( + event publisher.Event, shouldBlock bool, +) bool { + if producer.cancelled { + return false + } + serialized, err := producer.encoder.encode(&event) + if err != nil { + producer.queue.logger.Errorf( + "Couldn't serialize incoming event: %v", err) + return false + } + request := producerWriteRequest{ + frame: &writeFrame{ + serialized: serialized, + producer: producer, + }, + shouldBlock: shouldBlock, + // This response channel will be used by the core loop, so it must have + // buffer size 1 to guarantee that the core loop will not need to block. + responseChan: make(chan bool, 1), + } + + select { + case producer.queue.producerWriteRequestChan <- request: + // The request has been sent, and we are now guaranteed to get a result on + // the response channel, so we must read from it immediately to avoid + // blocking the core loop. + response := <-request.responseChan + return response + case <-producer.queue.done: + return false + case <-producer.done: + return false + } +} + +func (producer *diskQueueProducer) Cancel() int { + if producer.cancelled { + return 0 + } + producer.cancelled = true + close(producer.done) + + // TODO (possibly?): message the core loop to remove any pending events that + // were sent through this producer. If we do, return the number of cancelled + // events here instead of zero. + return 0 +} diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go new file mode 100644 index 00000000000..5f756996e5f --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -0,0 +1,249 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "errors" + "fmt" + "os" + "sync" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/queue" +) + +// diskQueue is the internal type representing a disk-based implementation +// of queue.Queue. +type diskQueue struct { + logger *logp.Logger + settings Settings + + // Metadata related to the segment files. + segments diskQueueSegments + + // Metadata related to consumer acks / positions of the oldest remaining + // frame. + acks *diskQueueACKs + + // The queue's helper loops, each of which is run in its own goroutine. + readerLoop *readerLoop + writerLoop *writerLoop + deleterLoop *deleterLoop + + // Wait group for shutdown of the goroutines associated with this queue: + // reader loop, writer loop, deleter loop, and core loop (diskQueue.run()). + waitGroup sync.WaitGroup + + // writing is true if the writer loop is processing a request, false + // otherwise. + writing bool + + // reading is true if the reader loop is processing a request, false + // otherwise. + reading bool + + // deleting is true if the deleter loop is processing a request, false + // otherwise. + deleting bool + + // The API channel used by diskQueueProducer to write events. + producerWriteRequestChan chan producerWriteRequest + + // pendingFrames is a list of all incoming data frames that have been + // accepted by the queue and are waiting to be sent to the writer loop. + // Segment ids in this list always appear in sorted order, even between + // requests (that is, a frame added to this list always has segment id + // at least as high as every previous frame that has ever been added). + pendingFrames []segmentedFrame + + // blockedProducers is a list of all producer write requests that are + // waiting for free space in the queue. + blockedProducers []producerWriteRequest + + // The channel to signal our goroutines to shut down. + done chan struct{} +} + +func init() { + queue.RegisterQueueType( + "disk", + queueFactory, + feature.MakeDetails( + "Disk queue", + "Buffer events on disk before sending to the output.", + feature.Beta)) +} + +// queueFactory matches the queue.Factory interface, and is used to add the +// disk queue to the registry. +func queueFactory( + ackListener queue.ACKListener, logger *logp.Logger, cfg *common.Config, +) (queue.Queue, error) { + settings, err := SettingsForUserConfig(cfg) + if err != nil { + return nil, fmt.Errorf("disk queue couldn't load user config: %w", err) + } + settings.WriteToDiskListener = ackListener + return NewQueue(logger, settings) +} + +// NewQueue returns a disk-based queue configured with the given logger +// and settings, creating it if it doesn't exist. +func NewQueue(logger *logp.Logger, settings Settings) (queue.Queue, error) { + logger = logger.Named("diskqueue") + logger.Debugf( + "Initializing disk queue at path %v", settings.directoryPath()) + + if settings.MaxBufferSize > 0 && + settings.MaxBufferSize < settings.MaxSegmentSize*2 { + return nil, fmt.Errorf( + "disk queue buffer size (%v) must be at least "+ + "twice the segment size (%v)", + settings.MaxBufferSize, settings.MaxSegmentSize) + } + + // Create the given directory path if it doesn't exist. + err := os.MkdirAll(settings.directoryPath(), os.ModePerm) + if err != nil { + return nil, fmt.Errorf("couldn't create disk queue directory: %w", err) + } + + // Load the previous queue position, if any. + nextReadPosition, err := queuePositionFromPath(settings.stateFilePath()) + if err != nil && !errors.Is(err, os.ErrNotExist) { + // Errors reading / writing the position are non-fatal -- we just log a + // warning and fall back on the oldest existing segment, if any. + logger.Warnf("Couldn't load most recent queue position: %v", err) + } + positionFile, err := os.OpenFile( + settings.stateFilePath(), os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + // This is not the _worst_ error: we could try operating even without a + // position file. But it indicates a problem with the queue permissions on + // disk, which keeps us from tracking our position within the segment files + // and could also prevent us from creating new ones, so we treat this as a + // fatal error on startup rather than quietly providing degraded + // performance. + return nil, fmt.Errorf("couldn't write to state file: %v", err) + } + + // Index any existing data segments to be placed in segments.reading. + initialSegments, err := scanExistingSegments(settings.directoryPath()) + if err != nil { + return nil, err + } + var nextSegmentID segmentID + if len(initialSegments) > 0 { + // Initialize nextSegmentID to the first ID after the existing segments. + lastID := initialSegments[len(initialSegments)-1].id + nextSegmentID = lastID + 1 + } + + // If any of the initial segments are older than the current queue + // position, move them directly to the acked list where they can be + // deleted. + ackedSegments := []*queueSegment{} + readSegmentID := nextReadPosition.segmentID + for len(initialSegments) > 0 && initialSegments[0].id < readSegmentID { + ackedSegments = append(ackedSegments, initialSegments[0]) + initialSegments = initialSegments[1:] + } + + // If the queue position is older than all existing segments, advance + // it to the beginning of the first one. + if len(initialSegments) > 0 && readSegmentID < initialSegments[0].id { + nextReadPosition = queuePosition{segmentID: initialSegments[0].id} + } + + queue := &diskQueue{ + logger: logger, + settings: settings, + + segments: diskQueueSegments{ + reading: initialSegments, + nextID: nextSegmentID, + nextReadOffset: nextReadPosition.offset, + }, + + acks: newDiskQueueACKs(logger, nextReadPosition, positionFile), + + readerLoop: newReaderLoop(settings), + writerLoop: newWriterLoop(logger, settings), + deleterLoop: newDeleterLoop(settings), + + producerWriteRequestChan: make(chan producerWriteRequest), + + done: make(chan struct{}), + } + + // We wait for four goroutines on shutdown: core loop, reader loop, + // writer loop, deleter loop. + queue.waitGroup.Add(4) + + // Start the goroutines and return the queue! + go func() { + queue.readerLoop.run() + queue.waitGroup.Done() + }() + go func() { + queue.writerLoop.run() + queue.waitGroup.Done() + }() + go func() { + queue.deleterLoop.run() + queue.waitGroup.Done() + }() + go func() { + queue.run() + queue.waitGroup.Done() + }() + + return queue, nil +} + +// +// diskQueue implementation of the queue.Queue interface +// + +func (dq *diskQueue) Close() error { + // Closing the done channel signals to the core loop that it should + // shut down the other helper goroutines and wrap everything up. + close(dq.done) + dq.waitGroup.Wait() + + return nil +} + +func (dq *diskQueue) BufferConfig() queue.BufferConfig { + return queue.BufferConfig{MaxEvents: 0} +} + +func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { + return &diskQueueProducer{ + queue: dq, + config: cfg, + encoder: newEventEncoder(), + done: make(chan struct{}), + } +} + +func (dq *diskQueue) Consumer() queue.Consumer { + return &diskQueueConsumer{queue: dq} +} diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go new file mode 100644 index 00000000000..dc2bb95777f --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -0,0 +1,247 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "encoding/binary" + "fmt" + "os" +) + +type readerLoopRequest struct { + segment *queueSegment + startOffset segmentOffset + startFrameID frameID + endOffset segmentOffset +} + +type readerLoopResponse struct { + // The number of frames successfully read from the requested segment file. + frameCount uint64 + + // The number of bytes successfully read from the requested segment file. + byteCount uint64 + + // If there was an error in the segment file (i.e. inconsistent data), the + // err field is set. + err error +} + +type readerLoop struct { + // The settings for the queue that created this loop. + settings Settings + + // When there is a block available for reading, it will be sent to + // requestChan. When the reader loop has finished processing it, it + // sends the result to finishedReading. If there is more than one block + // available for reading, the core loop will wait until it gets a + // finishedReadingMessage before it + requestChan chan readerLoopRequest + responseChan chan readerLoopResponse + + // Frames that have been read from disk are sent to this channel. + // Unlike most of the queue's API channels, this one is buffered to allow + // the reader to read ahead and cache pending frames before a consumer + // explicitly requests them. + output chan *readFrame + + // The helper object to deserialize binary blobs from the queue into + // publisher.Event objects that can be returned in a readFrame. + decoder *eventDecoder +} + +func newReaderLoop(settings Settings) *readerLoop { + return &readerLoop{ + settings: settings, + + requestChan: make(chan readerLoopRequest, 1), + responseChan: make(chan readerLoopResponse), + output: make(chan *readFrame, settings.ReadAheadLimit), + decoder: newEventDecoder(), + } +} + +func (rl *readerLoop) run() { + for { + request, ok := <-rl.requestChan + if !ok { + // The channel is closed, we are shutting down. + close(rl.output) + return + } + response := rl.processRequest(request) + rl.responseChan <- response + } +} + +func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopResponse { + frameCount := uint64(0) + byteCount := uint64(0) + nextFrameID := request.startFrameID + + // Open the file and seek to the starting position. + handle, err := request.segment.getReader(rl.settings) + if err != nil { + return readerLoopResponse{err: err} + } + defer handle.Close() + _, err = handle.Seek(segmentHeaderSize+int64(request.startOffset), 0) + if err != nil { + return readerLoopResponse{err: err} + } + + targetLength := uint64(request.endOffset - request.startOffset) + for { + remainingLength := targetLength - byteCount + + // Try to read the next frame, clipping to the given bound. + // If the next frame extends past this boundary, nextFrame will return + // an error. + frame, err := rl.nextFrame(handle, remainingLength) + if frame != nil { + // Add the segment / frame ID, which nextFrame leaves blank. + frame.segment = request.segment + frame.id = nextFrameID + nextFrameID++ + // We've read the frame, try sending it to the output channel. + select { + case rl.output <- frame: + // Successfully sent! Increment the total for this request. + frameCount++ + byteCount += frame.bytesOnDisk + case <-rl.requestChan: + // Since we haven't sent a finishedReading message yet, we can only + // reach this case when the nextReadBlock channel is closed, indicating + // queue shutdown. In this case we immediately return. + return readerLoopResponse{ + frameCount: frameCount, + byteCount: byteCount, + err: nil, + } + } + } + + // We are done with this request if: + // - there was an error reading the frame, + // - there are no more frames to read, or + // - we have reached the end of the requested region + if err != nil || frame == nil || byteCount >= targetLength { + return readerLoopResponse{ + frameCount: frameCount, + byteCount: byteCount, + err: err, + } + } + + // If the output channel's buffer is not full, the previous select + // might not recognize when the queue is being closed, so check that + // again separately before we move on to the next data frame. + select { + case <-rl.requestChan: + return readerLoopResponse{ + frameCount: frameCount, + byteCount: byteCount, + err: nil, + } + default: + } + } +} + +// nextFrame reads and decodes one frame from the given file handle, as long +// it does not exceed the given length bound. The returned frame leaves the +// segment and frame IDs unset. +func (rl *readerLoop) nextFrame( + handle *os.File, maxLength uint64, +) (*readFrame, error) { + // Ensure we are allowed to read the frame header. + if maxLength < frameHeaderSize { + return nil, fmt.Errorf( + "Can't read next frame: remaining length %d is too low", maxLength) + } + // Wrap the handle to retry non-fatal errors and always return the full + // requested data length if possible. + reader := autoRetryReader{handle} + var frameLength uint32 + err := binary.Read(reader, binary.LittleEndian, &frameLength) + if err != nil { + return nil, fmt.Errorf("Couldn't read data frame header: %w", err) + } + + // If the frame extends past the area we were told to read, return an error. + // This should never happen unless the segment file is corrupted. + if maxLength < uint64(frameLength) { + return nil, fmt.Errorf( + "Can't read next frame: frame size is %d but remaining data is only %d", + frameLength, maxLength) + } + if frameLength <= frameMetadataSize { + // Valid enqueued data must have positive length + return nil, fmt.Errorf( + "Data frame with no data (length %d)", frameLength) + } + + // Read the actual frame data + dataLength := frameLength - frameMetadataSize + bytes := rl.decoder.Buffer(int(dataLength)) + _, err = reader.Read(bytes) + if err != nil { + return nil, fmt.Errorf("Couldn't read data frame content: %w", err) + } + + // Read the footer (checksum + duplicate length) + var checksum uint32 + err = binary.Read(reader, binary.LittleEndian, &checksum) + if err != nil { + return nil, fmt.Errorf("Couldn't read data frame checksum: %w", err) + } + expected := computeChecksum(bytes) + if checksum != expected { + return nil, fmt.Errorf( + "Data frame checksum mismatch (%x != %x)", checksum, expected) + } + + var duplicateLength uint32 + err = binary.Read(reader, binary.LittleEndian, &duplicateLength) + if err != nil { + return nil, fmt.Errorf("Couldn't read data frame footer: %w", err) + } + if duplicateLength != frameLength { + return nil, fmt.Errorf( + "Inconsistent data frame length (%d vs %d)", + frameLength, duplicateLength) + } + + event, err := rl.decoder.Decode() + if err != nil { + // Unlike errors in the segment or frame metadata, this is entirely + // a problem in the event [de]serialization which may be isolated (i.e. + // may not indicate data corruption in the segment). + // TODO: Rather than pass this error back to the read request, which + // discards the rest of the segment, we should just log the error and + // advance to the next frame, which is likely still valid. + return nil, fmt.Errorf("Couldn't decode data frame: %w", err) + } + + frame := &readFrame{ + event: event, + bytesOnDisk: uint64(frameLength), + } + + return frame, nil +} diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go new file mode 100644 index 00000000000..5ce0dc49962 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -0,0 +1,254 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "encoding/binary" + "fmt" + "io/ioutil" + "os" + "sort" + "strconv" + "strings" +) + +// diskQueueSegments encapsulates segment-related queue metadata. +type diskQueueSegments struct { + + // A list of the segments that have not yet been completely written, sorted + // by increasing segment ID. When the first entry has been completely + // written, it is removed from this list and appended to reading. + // + // If the reading list is empty, the queue may read from a segment that is + // still being written, but it will always be writing[0], since later + // entries do not yet exist on disk. + writing []*queueSegment + + // A list of the segments that have been completely written but have + // not yet been completely read, sorted by increasing segment ID. When the + // first entry has been completely read, it is removed from this list and + // appended to acking. + reading []*queueSegment + + // A list of the segments that have been completely read but have not yet + // been completely acknowledged, sorted by increasing segment ID. When the + // first entry has been completely acknowledged, it is removed from this + // list and appended to acked. + acking []*queueSegment + + // A list of the segments that have been completely read and acknowledged + // and are ready to be deleted. When a segment is successfully deleted, it + // is removed from this list and discarded. + acked []*queueSegment + + // The next sequential unused segment ID. This is what will be assigned + // to the next queueSegment we create. + nextID segmentID + + // nextWriteOffset is the segment offset at which the next new frame + // should be written. This offset always applies to the last entry of + // writing[]. This is distinct from the endOffset field within a segment: + // endOffset tracks how much data _has_ been written to a segment, while + // nextWriteOffset also includes everything that is _scheduled_ to be + // written. + nextWriteOffset segmentOffset + + // nextReadFrameID is the first frame ID in the current or pending + // read request. + nextReadFrameID frameID + + // nextReadOffset is the segment offset corresponding to the frame + // nextReadFrameID. This offset always applies to the first reading + // segment: either reading[0], or writing[0] if reading is empty. + nextReadOffset segmentOffset +} + +// segmentID is a unique persistent integer id assigned to each created +// segment in ascending order. +type segmentID uint64 + +// segmentOffset is a byte index into the segment's data region. +// An offset of 0 means the first byte after the segment file header. +type segmentOffset uint64 + +// The metadata for a single segment file. +type queueSegment struct { + // A segment id is globally unique within its originating queue. + id segmentID + + // The byte offset of the end of the segment's data region. This is + // updated when the segment is written to, and should always correspond + // to the end of a complete data frame. The total size of a segment file + // on disk is segmentHeaderSize + segment.endOffset. + endOffset segmentOffset + + // The ID of the first frame that was / will be read from this segment. + // This field is only valid after a read request has been sent for + // this segment. (Currently it is only used to handle consumer ACKs, + // which can only happen after reading has begun on the segment.) + firstFrameID frameID + + // The number of frames read from this segment during this session. This + // does not necessarily equal the number of frames in the segment, even + // after reading is complete, since the segment may have been partially + // read during a previous session. + // + // Used to count how many frames still need to be acknowledged by consumers. + framesRead uint64 +} + +type segmentHeader struct { + version uint32 +} + +// Segment headers are currently just a 32-bit version. +const segmentHeaderSize = 4 + +// Sort order: we store loaded segments in ascending order by their id. +type bySegmentID []*queueSegment + +func (s bySegmentID) Len() int { return len(s) } +func (s bySegmentID) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s bySegmentID) Less(i, j int) bool { return s[i].id < s[j].id } + +// Scan the given path for segment files, and return them in a list +// ordered by segment id. +func scanExistingSegments(path string) ([]*queueSegment, error) { + files, err := ioutil.ReadDir(path) + if err != nil { + return nil, fmt.Errorf("Couldn't read queue directory '%s': %w", path, err) + } + + segments := []*queueSegment{} + for _, file := range files { + if file.Size() <= segmentHeaderSize { + // Ignore segments that don't have at least some data beyond the + // header (this will always be true of segments we write unless there + // is an error). + continue + } + components := strings.Split(file.Name(), ".") + if len(components) == 2 && strings.ToLower(components[1]) == "seg" { + // Parse the id as base-10 64-bit unsigned int. We ignore file names that + // don't match the "[uint64].seg" pattern. + if id, err := strconv.ParseUint(components[0], 10, 64); err == nil { + segments = append(segments, + &queueSegment{ + id: segmentID(id), + endOffset: segmentOffset(file.Size() - segmentHeaderSize), + }) + } + } + } + sort.Sort(bySegmentID(segments)) + return segments, nil +} + +func (segment *queueSegment) sizeOnDisk() uint64 { + return uint64(segment.endOffset) + segmentHeaderSize +} + +// Should only be called from the reader loop. +func (segment *queueSegment) getReader( + queueSettings Settings, +) (*os.File, error) { + path := queueSettings.segmentPath(segment.id) + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf( + "Couldn't open segment %d: %w", segment.id, err) + } + // Right now there is only one valid header (indicating schema version + // zero) so we don't need the value itself. + _, err = readSegmentHeader(file) + if err != nil { + file.Close() + return nil, fmt.Errorf("Couldn't read segment header: %w", err) + } + + return file, nil +} + +// Should only be called from the writer loop. +func (segment *queueSegment) getWriter( + queueSettings Settings, +) (*os.File, error) { + path := queueSettings.segmentPath(segment.id) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return nil, err + } + header := &segmentHeader{version: 0} + err = writeSegmentHeader(file, header) + if err != nil { + return nil, fmt.Errorf("Couldn't write segment header: %w", err) + } + + return file, nil +} + +// getWriterWithRetry tries to create a file handle for writing via +// queueSegment.getWriter. On error, it retries as long as the given +// retry callback returns true. This is used for timed retries when +// creating a queue segment from the writer loop. +func (segment *queueSegment) getWriterWithRetry( + queueSettings Settings, retry func(error) bool, +) (*os.File, error) { + file, err := segment.getWriter(queueSettings) + for err != nil && retry(err) { + // Try again + file, err = segment.getWriter(queueSettings) + } + return file, err +} + +func readSegmentHeader(in *os.File) (*segmentHeader, error) { + header := &segmentHeader{} + err := binary.Read(in, binary.LittleEndian, &header.version) + if err != nil { + return nil, err + } + if header.version != 0 { + return nil, fmt.Errorf("Unrecognized schema version %d", header.version) + } + return header, nil +} + +func writeSegmentHeader(out *os.File, header *segmentHeader) error { + err := binary.Write(out, binary.LittleEndian, header.version) + return err +} + +// The number of bytes occupied by all the queue's segment files. This +// should only be called from the core loop. +func (segments *diskQueueSegments) sizeOnDisk() uint64 { + total := uint64(0) + for _, segment := range segments.writing { + total += segment.sizeOnDisk() + } + for _, segment := range segments.reading { + total += segment.sizeOnDisk() + } + for _, segment := range segments.acking { + total += segment.sizeOnDisk() + } + for _, segment := range segments.acked { + total += segment.sizeOnDisk() + } + return total +} diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go new file mode 100644 index 00000000000..9db8e7b1bd9 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -0,0 +1,154 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Encoding / decoding routines adapted from +// libbeat/publisher/queue/spool/codec.go. + +package diskqueue + +import ( + "bytes" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/outputs/codec" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/go-structform/gotype" + "github.com/elastic/go-structform/json" +) + +type eventEncoder struct { + buf bytes.Buffer + folder *gotype.Iterator +} + +type eventDecoder struct { + buf []byte + + parser *json.Parser + unfolder *gotype.Unfolder +} + +type entry struct { + Timestamp int64 + Flags uint8 + Meta common.MapStr + Fields common.MapStr +} + +const ( + // If + flagGuaranteed uint8 = 1 << 0 +) + +func newEventEncoder() *eventEncoder { + e := &eventEncoder{} + e.reset() + return e +} + +func (e *eventEncoder) reset() { + e.folder = nil + + visitor := json.NewVisitor(&e.buf) + // This can't return an error: NewIterator is deterministic based on its + // input, and doesn't return an error when called with valid options. In + // this case the options are hard-coded to fixed values, so they are + // guaranteed to be valid and we can safely proceed. + folder, _ := gotype.NewIterator(visitor, + gotype.Folders( + codec.MakeTimestampEncoder(), + codec.MakeBCTimestampEncoder(), + ), + ) + + e.folder = folder +} + +func (e *eventEncoder) encode(event *publisher.Event) ([]byte, error) { + e.buf.Reset() + + err := e.folder.Fold(entry{ + Timestamp: event.Content.Timestamp.UTC().UnixNano(), + Flags: uint8(event.Flags), + Meta: event.Content.Meta, + Fields: event.Content.Fields, + }) + if err != nil { + e.reset() + return nil, err + } + + // Copy the encoded bytes to a new array owned by the caller. + bytes := e.buf.Bytes() + result := make([]byte, len(bytes)) + copy(result, bytes) + + return result, nil +} + +func newEventDecoder() *eventDecoder { + d := &eventDecoder{} + d.reset() + return d +} + +func (d *eventDecoder) reset() { + // When called on nil, NewUnfolder deterministically returns a nil error, + // so it's safe to ignore the error result. + unfolder, _ := gotype.NewUnfolder(nil) + + d.unfolder = unfolder + d.parser = json.NewParser(unfolder) +} + +// Buffer prepares the read buffer to hold the next event of n bytes. +func (d *eventDecoder) Buffer(n int) []byte { + if cap(d.buf) > n { + d.buf = d.buf[:n] + } else { + d.buf = make([]byte, n) + } + return d.buf +} + +func (d *eventDecoder) Decode() (publisher.Event, error) { + var ( + to entry + err error + ) + + d.unfolder.SetTarget(&to) + defer d.unfolder.Reset() + + err = d.parser.Parse(d.buf) + + if err != nil { + d.reset() // reset parser just in case + return publisher.Event{}, err + } + + return publisher.Event{ + Flags: publisher.EventFlags(to.Flags), + Content: beat.Event{ + Timestamp: time.Unix(0, to.Timestamp), + Fields: to.Fields, + Meta: to.Meta, + }, + }, nil +} diff --git a/libbeat/publisher/queue/diskqueue/state_file.go b/libbeat/publisher/queue/diskqueue/state_file.go new file mode 100644 index 00000000000..2ff14e3e5e2 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/state_file.go @@ -0,0 +1,93 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "bufio" + "encoding/binary" + "fmt" + "os" +) + +// Given an open file handle to the queue state, decode the current position +// and return the result if successful, otherwise an error. +func queuePositionFromHandle( + file *os.File, +) (queuePosition, error) { + _, err := file.Seek(0, 0) + if err != nil { + return queuePosition{}, err + } + + reader := bufio.NewReader(file) + var version uint32 + err = binary.Read(reader, binary.LittleEndian, &version) + if err != nil { + return queuePosition{}, err + } + if version != 0 { + return queuePosition{}, + fmt.Errorf("Unsupported queue metadata version (%d)", version) + } + + position := queuePosition{} + err = binary.Read(reader, binary.LittleEndian, &position.segmentID) + if err != nil { + return queuePosition{}, err + } + + err = binary.Read( + reader, binary.LittleEndian, &position.offset) + if err != nil { + return queuePosition{}, err + } + + return position, nil +} + +func queuePositionFromPath(path string) (queuePosition, error) { + // Try to open an existing state file. + file, err := os.OpenFile(path, os.O_RDONLY, 0600) + if err != nil { + return queuePosition{}, err + } + defer file.Close() + return queuePositionFromHandle(file) +} + +// Given the queue position, encode and write it to the given file handle. +// Returns nil if successful, otherwise an error. +func writeQueuePositionToHandle( + file *os.File, + position queuePosition, +) error { + _, err := file.Seek(0, 0) + if err != nil { + return err + } + + // Want to write: version (0), segment id, segment offset. + elems := []interface{}{uint32(0), position.segmentID, position.offset} + for _, elem := range elems { + err = binary.Write(file, binary.LittleEndian, &elem) + if err != nil { + return err + } + } + return nil +} diff --git a/libbeat/publisher/queue/diskqueue/util.go b/libbeat/publisher/queue/diskqueue/util.go new file mode 100644 index 00000000000..60c529a9992 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/util.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "errors" + "io" + "syscall" +) + +// A wrapper for an io.Reader that tries to read the full number of bytes +// requested, retrying on EAGAIN and EINTR, and returns an error if +// and only if the number of bytes read is less than requested. +// This is similar to io.ReadFull but with retrying. +type autoRetryReader struct { + wrapped io.Reader +} + +func (r autoRetryReader) Read(p []byte) (int, error) { + bytesRead := 0 + reader := r.wrapped + n, err := reader.Read(p) + for n < len(p) { + if err != nil && !readErrorIsRetriable(err) { + return bytesRead + n, err + } + // If there is an error, it is retriable, so advance p and try again. + bytesRead += n + p = p[n:] + n, err = reader.Read(p) + } + return bytesRead + n, nil +} + +func readErrorIsRetriable(err error) bool { + return errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.EAGAIN) +} + +// writeErrorIsRetriable returns true if the given IO error can be +// immediately retried. +func writeErrorIsRetriable(err error) bool { + return errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.EAGAIN) +} + +// callbackRetryWriter is an io.Writer that wraps another writer and enables +// write-with-retry. When a Write encounters an error, it is passed to the +// retry callback. If the callback returns true, the the writer retries +// any unwritten portion of the input, otherwise it passes the error back +// to the caller. +// This helper is specifically for working with the writer loop, which needs +// to be able to retry forever at configurable intervals, but also cancel +// immediately if the queue is closed. +// This writer is unbuffered. In particular, it is safe to modify the +// "wrapped" field in-place as long as it isn't captured by the callback. +type callbackRetryWriter struct { + wrapped io.Writer + retry func(error) bool +} + +func (w callbackRetryWriter) Write(p []byte) (int, error) { + bytesWritten := 0 + writer := w.wrapped + n, err := writer.Write(p) + for n < len(p) { + if err != nil && !w.retry(err) { + return bytesWritten + n, err + } + // Advance p and try again. + bytesWritten += n + p = p[n:] + n, err = writer.Write(p) + } + return bytesWritten + n, nil +} diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go new file mode 100644 index 00000000000..b42e4573cab --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -0,0 +1,239 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "encoding/binary" + "os" + "time" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// A segmentedFrame is a data frame waiting to be written to disk along with +// the segment it has been assigned to. +type segmentedFrame struct { + // The frame to be written to disk. + frame *writeFrame + + // The segment to which this frame should be written. + segment *queueSegment +} + +// A writer loop request contains a list of writeFrames with the +// segment each should be written to. +// +// Input invariant (segment ids are sorted): If a frame f is included in a +// writerLoopRequest, then every subsequent frame in this and future +// requests must have segment id at least f.segment.id. +// +// That is: we must write all frames for segment 0 before we start writing +// to frame 1, etc. This assumption allows all file operations to happen +// safely in the writer loop without any knowledge of the broader queue state. +type writerLoopRequest struct { + frames []segmentedFrame +} + +// A writerLoopResponse reports the number of bytes written to each +// segment in the request. There is guaranteed to be one entry for each +// segment that appeared in the request, in the same order. If there is +// more than one entry, then all but the last segment have been closed. +type writerLoopResponse struct { + bytesWritten []int64 +} + +type writerLoop struct { + // The settings for the queue that created this loop. + settings Settings + + // The logger for the writer loop, assigned when the queue creates it. + logger *logp.Logger + + // The writer loop listens on requestChan for frames to write, and + // writes them to disk immediately (all queue capacity checking etc. is + // done by the core loop before sending it to the writer). + // When this channel is closed, any in-progress writes are aborted and + // the run loop terminates. + requestChan chan writerLoopRequest + + // The writer loop sends to responseChan when it has finished handling a + // request, to signal the core loop that it is ready for the next one. + responseChan chan writerLoopResponse + + // The most recent segment that has been written to, if there is one. + // This segment + currentSegment *queueSegment + + // The file handle corresponding to currentSegment. When currentSegment + // changes, this handle is closed and a new one is created. + outputFile *os.File +} + +func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { + return &writerLoop{ + logger: logger, + settings: settings, + + requestChan: make(chan writerLoopRequest, 1), + responseChan: make(chan writerLoopResponse), + } +} + +func (wl *writerLoop) run() { + for { + block, ok := <-wl.requestChan + if !ok { + // The request channel is closed, we are done + return + } + bytesWritten := wl.processRequest(block) + wl.responseChan <- writerLoopResponse{bytesWritten: bytesWritten} + } +} + +// processRequest writes the frames in the given request to disk and returns +// the number of bytes written to each segment, in the order they were +// encountered. +func (wl *writerLoop) processRequest(request writerLoopRequest) []int64 { + // retryWriter wraps the file handle with timed retries. + // retryWriter.Write is guaranteed to return only if the write + // completely succeeded or the queue is being closed. + retryWriter := callbackRetryWriter{retry: wl.retryCallback} + + // We keep track of how many frames are written during this request, + // and send the associated ACKs to the queue / producer listeners + // in a batch at the end (since each ACK call can involve a round-trip + // to the registry). + totalACKCount := 0 + producerACKCounts := make(map[*diskQueueProducer]int) + + var bytesWritten []int64 // Bytes written to all segments. + curBytesWritten := int64(0) // Bytes written to the current segment. +outerLoop: + for _, frameRequest := range request.frames { + // If the new segment doesn't match the last one, we need to open a new + // file handle and possibly clean up the old one. + if wl.currentSegment != frameRequest.segment { + wl.logger.Debugf( + "Creating new segment file with id %v\n", frameRequest.segment.id) + if wl.outputFile != nil { + // Try to sync to disk, then close the file. + wl.outputFile.Sync() + wl.outputFile.Close() + wl.outputFile = nil + // We are done with this segment, add the byte count to the list and + // reset the current counter. + bytesWritten = append(bytesWritten, curBytesWritten) + curBytesWritten = 0 + } + wl.currentSegment = frameRequest.segment + file, err := wl.currentSegment.getWriterWithRetry( + wl.settings, wl.retryCallback) + if err != nil { + // This can only happen if the queue is being closed; abort. + break + } + wl.outputFile = file + } + // Make sure our writer points to the current file handle. + retryWriter.wrapped = wl.outputFile + + // We have the data and a file to write it to. We are now committed + // to writing this block unless the queue is closed in the meantime. + frameSize := uint32(frameRequest.frame.sizeOnDisk()) + + // The Write calls below all pass through retryWriter, so they can + // only return an error if the write should be aborted. Thus, all we + // need to do when we see an error is break out of the request loop. + err := binary.Write(retryWriter, binary.LittleEndian, frameSize) + if err != nil { + break + } + _, err = retryWriter.Write(frameRequest.frame.serialized) + if err != nil { + break + } + // Compute / write the frame's checksum + checksum := computeChecksum(frameRequest.frame.serialized) + err = binary.Write(wl.outputFile, binary.LittleEndian, checksum) + if err != nil { + break + } + // Write the frame footer's (duplicate) length + err = binary.Write(wl.outputFile, binary.LittleEndian, frameSize) + if err != nil { + break + } + // Update the byte count as the last step: that way if we abort while + // a frame is partially written, we only report up to the last + // complete frame. (This almost never matters, but it allows for + // more controlled recovery after a bad shutdown.) + curBytesWritten += int64(frameSize) + + // Update the ACKs that will be sent at the end of the request. + totalACKCount++ + if frameRequest.frame.producer.config.ACK != nil { + producerACKCounts[frameRequest.frame.producer]++ + } + + // Explicitly check if we should abort before starting the next frame. + select { + case <-wl.requestChan: + break outerLoop + default: + } + } + // Try to sync the written data to disk. + wl.outputFile.Sync() + + // If the queue has an ACK listener, notify it the frames were written. + if wl.settings.WriteToDiskListener != nil { + wl.settings.WriteToDiskListener.OnACK(totalACKCount) + } + + // Notify any producers with ACK listeners that their frames were written. + for producer, ackCount := range producerACKCounts { + producer.config.ACK(ackCount) + } + + // Return the total byte counts, including the final segment. + return append(bytesWritten, curBytesWritten) +} + +// retryCallback is called (by way of retryCallbackWriter) when there is +// an error writing to a segment file. It pauses for a configurable +// interval and returns true if the operation should be retried (which +// it always should, unless the queue is being closed). +func (wl *writerLoop) retryCallback(err error) bool { + if writeErrorIsRetriable(err) { + return true + } + // If the error is not immediately retriable, log the error + // and wait for the retry interval before trying again, but + // abort if the queue is closed (indicated by the request channel + // becoming unblocked). + wl.logger.Errorf("Writing to segment %v: %v", + wl.currentSegment.id, err) + select { + case <-time.After(time.Second): + // TODO: use a configurable interval here + return true + case <-wl.requestChan: + return false + } +}