Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: ADR-038 plugin proposal #10482

Merged
merged 4 commits into from
Nov 27, 2021
Merged

Conversation

i-norden
Copy link
Contributor

@i-norden i-norden commented Nov 2, 2021

For #10096

This PR introduces the updates to the ADR-038 spec for the transition to plugin-based streaming services. These updates reflect the implementation approach taken in i-norden#1 (will be rebased, retargeted, and reopened).

Author Checklist

All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.

I have...

  • included the correct type prefix in the PR title
  • added ! to the type prefix if API or client breaking change
  • targeted the correct branch (see PR Targeting)
  • provided a link to the relevant issue or specification
  • followed the guidelines for building modules
  • included the necessary unit and integration tests
  • added a changelog entry to CHANGELOG.md
  • included comments for documenting Go code
  • updated the relevant documentation or specification
  • reviewed "Files changed" and left comments if necessary
  • confirmed all CI checks have passed

Reviewers Checklist

All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.

I have...

  • confirmed the correct type prefix in the PR title
  • confirmed ! in the type prefix if API or client breaking change
  • confirmed all author checklist items have been addressed
  • reviewed state machine logic
  • reviewed API design and naming
  • reviewed documentation is accurate
  • reviewed tests and test coverage
  • manually tested (if applicable)

@i-norden i-norden changed the title Adr038 plugin proposal ADR-038 plugin proposal Nov 2, 2021
@github-actions github-actions bot added the T: ADR An issue or PR relating to an architectural decision record label Nov 2, 2021
@i-norden i-norden changed the title ADR-038 plugin proposal feat: ADR-038 plugin proposal Nov 2, 2021
@i-norden i-norden changed the title feat: ADR-038 plugin proposal doc: ADR-038 plugin proposal Nov 2, 2021
@i-norden i-norden changed the title doc: ADR-038 plugin proposal docs: ADR-038 plugin proposal Nov 8, 2021
Copy link
Collaborator

@fedekunze fedekunze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I think this ADR is well written, although it would still be nice to add more example use cases for plugins (indexing state vs events, upload to Postgres, etc)

@tac0turtle tac0turtle added the A:automerge Automatically merge PR once all prerequisites pass. label Nov 10, 2021
Copy link
Member

@aaronc aaronc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

In the future it would be nice to have a plugin-based modular server along the lines of caddy. In that scenario basically everything in app.toml would be a plugin.

Copy link
Collaborator

@robert-zaremba robert-zaremba left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Left few comments to consider.

docs/architecture/adr-038-state-listening.md Outdated Show resolved Hide resolved
docs/architecture/adr-038-state-listening.md Outdated Show resolved Hide resolved
docs/architecture/adr-038-state-listening.md Outdated Show resolved Hide resolved
Start(wg *sync.WaitGroup)

// Plugin is the base Plugin interface
Plugin
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't the StateStreaming have a writer methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StateStreaming plugin uses the Register method to register StreamingService(s) with the BaseApp, the StreamingService interface exposes WriteListener(s) that have the OnWrite method. So in other words, the writing interface is buried two levels down. Let me see if I can restructure this so that it is more clear where writing is occurring.

Instead of prescribing a final data format in this ADR, it is left to a specific plugin implementation to define and document this format.
We take this approach because flexibility in the final format is necessary to support a wide range of streaming service plugins. For example,
the data format for a streaming service that writes the data out to a set of files will differ from the data format that is written to a Kafka topic.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should unify the data format for all external services, especially message queues. Essentially client app should easily connect to RabbitMQ or Redis and expect the same data format.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API already provides unified data by passing along the marshaled []byte data. Adding a constraint like this might present some difficulties. The API needs be flexible to allow authors to choose the format that they want to store data on external systems. I don't see how it can be enforced given that plugin authors have control on how they process, transform, and store the data externally. They may only be interested in specific data stored in a KVStore, which can be JSON, Protbuf, or String values.

@clevinson clevinson mentioned this pull request Nov 12, 2021
11 tasks
@tac0turtle
Copy link
Member

believe this addresses #10337

@egaxhaj
Copy link
Contributor

egaxhaj commented Nov 20, 2021

Hi all. I have an example in place for Kafka (README.md) following the approach taken in i-norden#1. However, the current ADR-038 design does not guarantee consistent state between application internal state and external listeners (i.e: Kafka, PostgreSQL, etc). Instead, it uses a fire-and-forget approach which is not practical for real world production systems. An acknowledgement mechanism (success/failure from listeners) needs to be implemented before calling store.Commit() when BeginBlock, DeliverTx, EndBlock are triggered in order to guarantee a synchronized state between internal state store and listeners. This can be achieved by designing aa strategy similar to a two-phase commit in ADR-038. Take Kafka for example. Kafka can guarantee messages will not be duplicated during network failures, restarts or producer retires by configuring the producer to be idempotent and transactional. So, all is left to do is provide a channel of successful delivery of messages to the application before calling store.Commit(). So if a node dies after successful deliver to Kafka but just before calling store.Commit() a restart of the node will replay the uncommitted transactions and sync up the two states.

@i-norden
Copy link
Contributor Author

i-norden commented Nov 22, 2021

So if a node dies after successful deliver to Kafka but just before calling store.Commit() a restart of the node will replay the uncommitted transactions and sync up the two states.

This already occurs, restarting the node will restart at the last committed state, it will re-process the block it failed to Commit() and in doing so stream out that block's state changes and ABCI messages to the external system. The only issue here is that the external system is going to receive those messages twice. The external system should be able to detect and reject duplicate messages (simply by tracking the latest block number at which messages were received or, for example, by running into unique constraints on the data in the case of Postgres). So I don't think desync due to a node falling over is an issue, the issue is if the external service falls over.

But I'm not convinced the onus of guaranteeing an arbitrary external system is properly synced should fall on the state machine, I'm curious what other people with more SDK experience and knowledge think.

My thoughts:

I could be thinking about this all wrong or over thinking it, and some of these concerns may not be particularly relevant to Kafka but the design is intended to support arbitrary external data destinations. Apologies for the scatteredness of these thoughts.

  1. If the external system falls over for whatever reason do we want the app to halt and wait indefinitely for it to signal success? Can a node tolerate downtime/delay in the core application because of an external system failure?
  2. Or should the app instead generate a cache of messages that it fails to send, while continuing to otherwise chug along as normal. Once it receives a signal that the external service is ready to receive messages again it then flushes the cache out to it. I think this is the right choice but then there is still the question of what tolerance do we put on the size of this cache before we either need to halt and wait for the external service or begin tossing away message because we can't grow the cache indefinitely. Or is this cache journaled on disc, in which case do we then need failure handling for this external cache as well?
  3. We want to avoid back-pressure on the app from these external services as much as possible.
  4. At what point does the external system signal success? What does "success" mean here? Does it signal success as soon as it receives the messages, or does it only signal success after it has successfully performed whatever downstream processing (e.g. transforming, indexing) it is responsible for performing. If it signals success after simply receiving the message but then falls over before it can finish processing it (and doesn't have the message cached or said cache also falls over), then that is not actually success from the perspective of the external system. But delaying the "success" signal til after complete external processing could become rate limiting on the system depending on the nature of said processing and the behavior of the app when it doesn't receive positive acknowledgement of receipt.

So, all is left to do is provide a channel of successful delivery of messages to the application before calling store.Commit()

We need to decide how the app behaves when the channel does not signal successful delivery of messages. That behavior could be dependent on the nature of the external failure (e.g. did a pipe break and we need to re-establish our connection to the external service or is the network connection fine and we don't require reestablishing a connection?). We need to decide if it waits for a success signal after delivery of each individual ABCI message+state change set (e.g. a different <-successChan check after BeginBlock, after each DeliverTx, and after EndBlock) or is there a single <-successChan right before Commit()?

Working through an example, let's say:

  1. We modify the BaseApp so that it caches any messages it fails to send to the external service.
  2. It checks for success after every message sent, so it knows what to cache with intra-block granularity and catches any failure asap.
  3. Kafka falls over completely, the network pipe is broken.
  4. The app waits for a success signal off a channel, it doesn't receive it, it blocks.
  5. Depending on at which stage a "success" signal is sent back to the app, the app can't tell if it hasn't received the signal yet because the external system is lagging behind or never will receive it because the external system fell over entirely.
  6. So we wait for some threshold amount of time, after which we decide the external system must have failed. This threshold needs to be short enough to prevent unnecessary wait time, but long enough to avoid false positives where we were actually experiencing a lag and are now unnecessarily reinitializing and replaying everything.
  7. After deciding the external system has fallen over, it writes the message it failed on to the cache.
  8. The app unblocks and continues forward with regular processing, except it writes messages to the cache instead of the external system.
  9. We do whatever needs to be done externally to fix Kafka and restart its server.
  10. The app reestablishes a network connection between itself and Kafka. It either needs to attempt periodically until the connection is reestablished or the app needs to receive a signal telling it that the server is back up and ready for the connection to be reestablished.
  11. Once the connection is reestablished the app replays the cached messages and then reverts back to sending Kafka messages in real-time (we don't want to send out-of-order, so we can't begin sending new messages until the old messages in the cache have been cleared).
  12. When replaying the cached messages we need to handle failure to send them as well.

@egaxhaj
Copy link
Contributor

egaxhaj commented Nov 23, 2021

  1. If the external system falls over for whatever reason do we want the app to halt and wait indefinitely for it to signal success? Can a node tolerate downtime/delay in the core application because of an external system failure?

For this we can provide external systems with a choice. Currently ADR-038 supports multiple listeners per KVStore. Therefore, my thoughts are that there should be a configuration policy in place such that:

  1. fire-and-forget - curent design
  2. wait for {n} listeners to acknowledge delivery <-successChan (May not need to be supported)
  3. all listeners must respond with successful acknowledgement <-successChan

1 - commit right away
2 - stop node if threshold isn't met
3 - stop node.

2 and 3 implicitly say that you care about consistency.

  1. Or should the app instead generate a cache of messages that it didn't receive positive acknowledgement of receipt for, while continuing to otherwise chug along as normal. Once it receives positive acknowledgment it then flushes the cache. I think this is the right choice but then there is still the question of what tolerance do we put on the size of this cache before we either need to halt and wait for the external service or begin tossing away message because we can't grow the cache indefinitely. Or is this cache journaled on disc? Do we then need success/failure communications for this cache as well?

External system downtime is unknown and it should not be up to the SDK to support something like this. This would put a burden to on the node to keep track of messages that have failed and then replay them when the external system is started.

  1. We want to avoid back-pressure on the app from these external services as much as possible.

In business cases where consistency is more important, back-pressure is unavoidable. This comes down to choosing the right external system to save state and performance tuning.

In the fire-and-forget approach the IntermediateWriter cache will keep filling up faster than writing to the external system.

  1. At what point does the external system signal success? What does "success" mean here? Does it signal success as soon as it receives the messages, or does it only signal success after it has successfully performed whatever downstream processing (e.g. transforming, indexing) it is responsible for performing. If it signals success after simply receiving the message but then falls over before it can finish processing it (and doesn't have the message cached or said cache also falls over), then that is not actually success from the perspective of the external system. But delaying the "success" signal til after complete external processing could become rate limiting on the system depending on the nature of said processing and how we design the behavior in response to not receiving a success signal.

Sending a success before processing would satisfy the fire-and-forget scenario. The IntermediateWriter for example sends a success signal back satisfying fire-and-forget. The ladder would satisfy consistency.

So, all is left to do is provide a channel of successful delivery of messages to the application before calling store.Commit()

We need to decide how the app behaves when the channel does not signal successful delivery of messages. That behavior could be dependent on the nature of the external failure (e.g. did a pipe break and we need to re-establish our connection to the external service or is the network connection fine and we don't require reestablishing a connection?). We need to decide if it waits for a success signal after delivery of each individual ABCI message+state change set (e.g. a different <-successChan check after BeginBlock, after each DeliverTx, and after EndBlock) or is there a single <-successChan right before committing?

Working through an example, let's say:

  1. We modify the BaseApp so that it caches any messages it fails to send to the external service.

We should avoid any caching of undelivered messages. It complicates the design and puts unnecessary burden on the SDK (node).

  1. It checks for success after every message set, so it knows what to cache with intra-block granularity.
  2. Kafka falls over completely, the network pipe is broken.
  3. The app waits for a success signal off a channel, it doesn't receive it, it blocks
  4. Depending on at which stage a "success" signal is sent back to the app, the app can't tell if it hasn't received the signal yet because the external system is lagging behind or never will receive it because the external system fell over entirely.
  5. So we wait for some threshold amount of time, after which we decide the external system must have failed. This threshold needs to be short enough to prevent unnecessary wait time, but long enough to avoid false positives where we were actually experiencing a lag and are now unnecessarily reinitializing and replaying everything.
  6. We now need to 1) do whatever needs to be done externally to fix Kafka and restart its server 2) reestablish a network connection between the app and Kafka 3) once this connection is reestablished, the app needs to replay the messages that Kafka didn't positively acknowledge receipt of and then revert back to sending Kafka messages in real-time (we don't want to send out-of-order, so we can't begin sending new messages until the old messages in the cache have been cleared).
  7. When replaying the cached messages we need to handle failure to send them as well.

My thoughts are this:

  1. Implement a <-successChan

  2. Implement a policy in app.toml for:
    a. fire-and-forget
    b. Wait for {n} listeners to acknowledge success. (May not need to be supported)
    c. Wait for all listeners to acknowledge success.

    2.b and 2.c put emphasis on consistency. Stopping the node guarantees that the local state doesn't fall out of sync with external systems when the following occur:

    • network errors
      • Unable to communicate with external system
      • Successful commit on external system but fails to send ack to <-successChan
    • Successful communication but failed to commit to external system due to some error.
    • Maintenance down time on external systems.

It's up to external system implementations to make sure that they are set up to be idempotent to prevent duplication of uncommitted message replay after a node restart. (Kafka, PostgreSQL, MySQL all support ways to achieve idempotency.)

@i-norden
Copy link
Contributor Author

i-norden commented Nov 23, 2021

Thanks @egaxhaj-figure , got it! If we just have the node shutdown when it doesn't receive the expected success signal(s) within some threshold amount of time, that is simple enough.

External system downtime is unknown and it should not be up to the SDK to support something like this. This would put a burden to on the node to keep track of messages that have failed and then replay them when the external system is started.

By burden do you mean code complexity or performance impact? I agree the former would be a significant burden, but the later would be negligible compared to potential back-pressure from the state streaming and the regular processes of the node. And on the other-hand, causing the node to fall over and requiring external intervention every time a single message fails to be sent or acknowledged is a burden on the node operator. All these features will be optional, we could support all three options ("do nothing", "shut everything down", "try to recover programmatically, and then shut everything down if we can't within some amount of time or cache capacity").

The cache replay approach would not be ideal for instances of extended, indefinite external service downtime (although the cache size would be configurable, and as long as you have the hardware to support it it could support indefinite downtime recovery). But, for example, if we lose our network connection for a short amount of time due to intermittent network failure and not due to any issue with the system we are networking to, the internal caching approach could perform a complete recovery without any external intervention or node downtime. Having an intermediate option between "do nothing" and "shut everything down" doesn't seem unreasonable to me (in an ideal world where code complexity and dev hours aren't a concern).

In any case, for the sake of progressing things let's move forward with your proposal and we can debate the merits of this alternative and add it as a feature at a later time if there is interest.

@egaxhaj
Copy link
Contributor

egaxhaj commented Nov 23, 2021

Sorry, I wasn't clear. Yes, I was speaking to code complexity. I agree, it would be a great feature to have. However, given the code complexity, should this feature be included in the first release or a subsequent release?

@i-norden
Copy link
Contributor Author

Sorry, I wasn't clear. Yes, I was speaking to code complexity. I agree, it would be a great feature to have. However, given the code complexity, should this feature be included in the first release or a subsequent release?

Ah sorry I need to stop editing my comments. Yeah you're spot on, for the sake of progressing things let's move forward with your proposal and we can debate the merits of this alternative and add it as a feature at a later time if there is actual interest and not just my hypotheticals 😅

@i-norden
Copy link
Contributor Author

Your point about code complexity is especially pertinent given my difficulty in keeping up with to-do's as is 😅 Going to make the updates to the docs to reflect Robert's comments and your proposal, and then I will rework the plugin system implementation branch to match the new state of things and open a PR for that. I'll have that wrapped up before taking off for holiday this Thursday. Thanks again @egaxhaj-figure !

@egaxhaj
Copy link
Contributor

egaxhaj commented Nov 23, 2021

Your point about code complexity is especially pertinent given my difficulty in keeping up with to-do's as is 😅 Going to make the updates to the docs to reflect Robert's comments and your proposal, and then I will rework the plugin system implementation branch to match the new state of things and open a PR for that. I'll have that wrapped up before taking off for holiday this Thursday. Thanks again @egaxhaj-figure !

😄 Happy Thanksgiving!

@mergify mergify bot merged commit 414fbd3 into cosmos:master Nov 27, 2021
@i-norden i-norden mentioned this pull request Nov 29, 2021
9 tasks
blewater pushed a commit to e-money/cosmos-sdk that referenced this pull request Dec 8, 2021
For cosmos#10096 

This PR introduces the updates to the ADR-038 spec for the transition to plugin-based streaming services. These updates reflect the implementation approach taken in i-norden#1 (will be rebased, retargeted, and reopened). 

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [x] added `!` to the type prefix if API or client breaking change
- [x] targeted the correct branch (see [PR Targeting](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] provided a link to the relevant issue or specification
- [x] followed the guidelines for [building modules](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules)
- [x] included the necessary unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] added a changelog entry to `CHANGELOG.md`
- [x] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [x] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed 
- [ ] reviewed state machine logic
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A:automerge Automatically merge PR once all prerequisites pass. T: ADR An issue or PR relating to an architectural decision record
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants