Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request]: Support draining Spanner Change Stream connectors #30167

Open
1 of 16 tasks
ianb-pomelo opened this issue Jan 31, 2024 · 11 comments
Open
1 of 16 tasks

[Feature Request]: Support draining Spanner Change Stream connectors #30167

ianb-pomelo opened this issue Jan 31, 2024 · 11 comments

Comments

@ianb-pomelo
Copy link

What would you like to happen?

Right now one of the known limitations of the Spanner change stream source is it can't be drained 1. Is there a way to allow draining this connector?

Currently our use case is we have a job that consumes change stream value but the structure of this jobs changes frequently. To handle this, we try to do in-place updates and if those fail, drain and start a new job. This works with Pub/Sub sources but to get around the fact that the change streams can't be drained, we have an intermediate job that converts the Spanner changes into Pub/Sub messages and then the changing job consumes that. However, this has caused a huge increase in latency, the commit time -> change stream read is pretty consistently 200ms but when we add this Pub/Sub layer, it increases the latency to ~5s.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@liferoad
Copy link
Collaborator

liferoad commented Feb 1, 2024

cc @nielm

@Abacn
Copy link
Contributor

Abacn commented Feb 1, 2024

Draining streaming should work in general. If it's not draining there is something missing in the SDF, a fix similar to #25716 may work

@nielm
Copy link
Contributor

nielm commented Feb 1, 2024

cc @thiagotnunes (change streams author) for comment,
But I believe ChangeStreams does not use SDFs, which is probably why the drain is not working. #

The partitions are generated by Spanner itself and are read by a normal DoFn. (SpannerIO:1751)

@thiagotnunes
Copy link
Contributor

cc @nancyxu123 , current owner here

@efalkenberg
Copy link

Hey @ianb-pomelo

Thanks for the feedback!
Draining is something that we have in our backlog, but not prioritized yet.
I really appreciate the context that you provided, I will add that to our internal ticket and we'll update here when this gets prioritized.

Thanks!

Eike

@ianb-pomelo
Copy link
Author

Thanks for the update, looking forward to seeing it prioritized!

@bangau1
Copy link

bangau1 commented Sep 27, 2024

Hi all,

I also recently experimented a bit with SpannerIO's changestream to GCS Storage (from the provided template from google: https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-spanner-change-streams-to-cloud-storage). I've been trying to dig into any documentation that I can found, and to realized that the draining operation isn't supported. But I can confirm that the update in place is working.

The other thing that I found is that while cancelling the job is working, submitting another job with the same jobname and same metadata's table name doesn't work. I expect that it can continue ingesting the changestream from the previous checkpoint (that's what the metadata table is for CMIIW?).

I asked in the stackoverflow about the detail here: https://stackoverflow.com/questions/79027920/restarting-the-spannerios-changestream-to-gcs-text-json-pipeline-got-error

@Abacn
Copy link
Contributor

Abacn commented Oct 1, 2024

submitting another job with the same jobname and same metadata's table name doesn't work.

This is working as intended. Dataflow cannot have two jobs with the same job name unless one is in Done status (not running, cancelling, draining, etc)

@bangau1
Copy link

bangau1 commented Oct 3, 2024

submitting another job with the same jobname and same metadata's table name doesn't work.

This is working as intended. Dataflow cannot have two jobs with the same job name unless one is in Done status (not running, cancelling, draining, etc)

@Abacn I meant I cancelled it (stopped it), then proceed by submit a new pipeline with the same jobName and metadata table. But it returns error.

@Abacn
Copy link
Contributor

Abacn commented Nov 4, 2024

@Abacn I meant I cancelled it (stopped it), then proceed by submit a new pipeline with the same jobName and metadata table. But it returns error.

It takes time to have job move to "Done" status, usually minutes or longer.

@bangau1
Copy link

bangau1 commented Nov 5, 2024

@Abacn I meant I cancelled it (stopped it), then proceed by submit a new pipeline with the same jobName and metadata table. But it returns error.

It takes time to have job move to "Done" status, usually minutes or longer.

@Abacn just want to clarify if my comment wasn't clear: i submitted the second same jobName once the previous job was completely stopped (cancelled). The second job got error

Should I submit different issue for this? I already asked in stackoverflow, which error being shown up, etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants