Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement RandomQueue scheduler strategy #1914

Merged
merged 8 commits into from
Oct 3, 2024

Conversation

marianogappa
Copy link
Contributor

@marianogappa marianogappa commented Oct 2, 2024

This PR implements a new Scheduler Strategy based on a Concurrent Random Queue. It is based on @erezrokah 's Priority Queue Scheduler Strategy.

How does it work

This is hopefully a much simpler scheduling strategy. It doesn't have any semaphores; it just uses the existing concurrency setting.

Table resolvers (and their relations) get Pushed into a work queue, and concurrency workers Pull from this queue, but they pull a random element from it.

Why it should work better

The key benefit of this strategy is this:

  • Assumption 1: most slow syncs are actually slow because of rate limits, not because of I/O limits or too much data.
  • Assumption 2: the meaty part of the sync is syncing relations, because each child table has a resolver per parent.
  • Benefit: because the likelihood of picking up a child resolver of a given table is uniformly distributed across the int32 range, all relation API calls are evenly spread throughout the sync, thus optimally minimising rate limits!

Does it work better?

Still working on results. Notably AWS & Azure yield mixed results; still have to look into why.

GCP

Before

$ cli sync .
Loading spec(s) from .
Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 25799, Errors: 0, Warnings: 0, Time: 2m23s

UPDATE: GCP is moving to Round Robin strategy, and it's much faster with this strategy:

$ cli sync .
Loading spec(s) from .
Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 26355, Errors: 0, Warnings: 0, Time: 40s

After

$ cli sync .
Loading spec(s) from .
Starting sync for: gcp (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.0.7)]
Sync completed successfully. Resources: 26186, Errors: 0, Warnings: 0, Time: 34s

Result: 76.22% reduction in time, or 3.21 times faster.
Result against Round Robin: 15% reduction in time, or 0.18 times faster (probably within margin of error)

BigQuery

Before

$ cli sync bigquery_to_postgresql.yaml
Loading spec(s) from bigquery_to_postgresql.yaml
Starting sync for: bigquery (cloudquery/bigquery@v1.7.0) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 2m7s

After

$ cli sync bigquery_to_postgresql.yaml
Loading spec(s) from bigquery_to_postgresql.yaml
Starting sync for: bigquery (cloudquery/bigquery@v1.7.0) -> [postgresql (cloudquery/postgresql@v8.6.0)]
Sync completed successfully. Resources: 26139, Errors: 0, Warnings: 0, Time: 1m26s

Result: 32.28% reduction in time, or 0.48 times faster

SentinelOne

Before (it was already quite fast due to previous merged improvement)

$ cli sync .
Loading spec(s) from .
Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.5.5)]
Sync completed successfully. Resources: 1295, Errors: 0, Warnings: 0, Time: 15s

After

$ cli sync .
Loading spec(s) from .
Starting sync for: sentinelone (grpc@localhost:7777) -> [postgresql (cloudquery/postgresql@v8.5.5)]
Sync completed successfully. Resources: 1295, Errors: 0, Warnings: 0, Time: 8s

Result: 46.67% reduction in time, or 0.875 times faster

How to test

  • Add a go.mod replace for sdk: replace github.com/cloudquery/plugin-sdk/v4 => github.com/cloudquery/plugin-sdk/v4 v4.63.1-0.20241002131015-243705c940c6 (check last commit on this PR)
  • Run source plugin via grpc locally; make sure to configure the scheduler strategy to scheduler.StrategyRandomQueue.

How scary is it to merge

  • This scheduler strategy is not used by any plugins by default, so in principle this should be safe to merge.

@github-actions github-actions bot added the feat label Oct 2, 2024
@github-actions github-actions bot added feat and removed feat labels Oct 3, 2024
@erezrokah erezrokah mentioned this pull request Oct 3, 2024
5 tasks
@marianogappa marianogappa marked this pull request as ready for review October 3, 2024 10:05
@marianogappa marianogappa requested a review from a team as a code owner October 3, 2024 10:05
@github-actions github-actions bot added feat and removed feat labels Oct 3, 2024
Copy link
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks great, I'm going to test it a bit with a few plugins, also with OTEL enabled

//
// - If the queue is empty, check `IsIdle()` to check if no workers are active.
// - If workers are still active, call `Wait()` to block until state changes.
type activeWorkSignal struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 🚀 Much better than what I did before 🙈

StrategyDFS: "dfs",
StrategyRoundRobin: "round-robin",
StrategyShuffle: "shuffle",
StrategyRandomQueue: "random-queue",
Copy link
Member

@erezrokah erezrokah Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also call it shuffle-queue or just queue since we already have shuffle that means random

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to shuffle-queue.

tableMetrics.Duration.Store(&duration)
tableMetrics.OtelEndTime(ctx, endTime)
if parent == nil {
logger.Info().Uint64("resources", tableMetrics.Resources).Uint64("errors", tableMetrics.Errors).Dur("duration_ms", duration).Msg("table sync finished")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK now I remember what I removed here. We used to log a message here with the metrics of all relations after a parent finished.

Not we don't do that since reaching this code doesn't mean all relations finished syncing, it only means the parent finished pushing all the new work units into the queue.

See

if parent == nil { // Log only for root tables and relations only after resolving is done, otherwise we spam per object instead of per table.

I don't think it's a blocker and we can add logs alter on. But still something to consider

@marianogappa marianogappa merged commit af8ac87 into main Oct 3, 2024
8 checks passed
@marianogappa marianogappa deleted the mariano/priority-queue-changes branch October 3, 2024 15:43
@disq
Copy link
Member

disq commented Oct 3, 2024

Seems there's a flaky test which can be seen in a neighboring PR: https://github.com/cloudquery/plugin-sdk/actions/runs/11166226858/job/31039731830?pr=1920

kodiakhq bot pushed a commit that referenced this pull request Oct 4, 2024
🤖 I have created a release *beep* *boop*
---


## [4.65.0](v4.64.1...v4.65.0) (2024-10-04)


### Features

* Implement RandomQueue scheduler strategy ([#1914](#1914)) ([af8ac87](af8ac87))


### Bug Fixes

* Revert "fix: Error handling in StreamingBatchWriter" ([#1918](#1918)) ([38b4bfd](38b4bfd))
* **tests:** WriterTestSuite.handleNulls should not overwrite columns ([#1920](#1920)) ([08e18e2](08e18e2))

---
This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants