Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add StreamPlan and MicroBatchExecution #968

Closed
Tracked by #948
penghuo opened this issue Oct 26, 2022 · 1 comment
Closed
Tracked by #948

Add StreamPlan and MicroBatchExecution #968

penghuo opened this issue Oct 26, 2022 · 1 comment
Assignees
Labels
enhancement New feature or request

Comments

@penghuo
Copy link
Collaborator

penghuo commented Oct 26, 2022

  1. StreamPlan is a type of AbstractPlan which execute in continuesely
  2. MicroBatchExecution is one type of stream execution mode.

Limitations

  1. Currently, we did not consider coordination node failure recovery of stream execution.
  2. Also, we did not cover state recovery of stream execution. The state will maintain in memory.
@penghuo penghuo changed the title Add StreamPlan and MicroBatchTask Add StreamPlan and MicroBatchExecutionTask Oct 26, 2022
@penghuo penghuo changed the title Add StreamPlan and MicroBatchExecutionTask Add StreamPlan and MicroBatchExecution Oct 26, 2022
@penghuo
Copy link
Collaborator Author

penghuo commented Oct 26, 2022

Micro-batch Execution State Management

Data Structure

  • offsetLog, A write-ahead-log that records the offsets that are present in each batch. In order to ensure that a given batch will always consist of the same data, we write to this log before any processing is done. Thus, the Nth record in this log indicated data that is currently being processed and the N-1th entry indicates which offsets have been durably committed to the sink.
    • getLatest(), return latest batchId and OffsetSeq.
    • get(batchId), return OffsetSeq.
  • commitLog, keep track the latest commit batchId.
    • getLatest(), return the latest commit batchId.
  • committedOffsets, **** tracks how much data we have processed and committed to the sink or state store from each input source.
  • availableOffsets, tracks the offsets that are available to be processed, but have not yet be committed to the sink.

Algorithms
The MicroBatchExecution guarantee that

  • offsetLog.latest() return processing batch.
  • offsetLog.get(offsetLog.latest() - 1) return committed batch.
  • commitLog.latest() return latest committed batch.

At beginning, offsetLog and commitLog is empty.

  • At the beginning of each execution, calculate the diff between offsetLog and commitLog.
  • After collecting offset from source and before run query, MicroBatchExecution add processing offset to offsetLog.
  • After send data to sink, MicroBatchExecution add committed offset to commitLog.
offsetLog = empty
commitLog = empty

while(isActive) {
  latestBatchId = offsetLog.getLatest();
  availableOffset = offsetLog.get(latestBatchId);
  committedOffset = offsetLog.get(latestBatchId - 1);
  
  latestCommittedBatchId = commitLog.getLatest()

  if (latestBatchId == latestCommittedBatchId) {
    // last batch was successfully committed.
    currentBatchId = latestCommittedBatchId + 1
    committedOffsets ++= availableOffsets
  } else if (latestCommittedBatchId == latestBatchId - 1) {
    // last batch was not successfully committed.
    currentBatchId = latestBatchId
  } else {
    log.error("breaking loop invariant")
  }
  
  // before processing
  availableOffset = source.offset()
  offsetLog.add(currentBatchId, avaliableOffset)
  
  // processing
  
  // after sink
  commitLog.add(currentBatchId, avaliableOffset)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Development

No branches or pull requests

2 participants