Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49640][PS] Apply reservoir sampling in SampledPlotBase #48105

Closed
wants to merge 1 commit into from

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Sep 13, 2024

What changes were proposed in this pull request?

Apply reservoir sampling in SampledPlotBase

Why are the changes needed?

Existing sampling approach has two drawbacks:

1, it needs two jobs to sample max_rows rows:

  • df.count() to compute fraction = max_rows / count
  • df.sample(fraction).to_pandas() to do the sampling

2, the df.sample is based on Bernoulli sampling which cannot guarantee the sampled size == expected max_rows, e.g.

In [1]: df = spark.range(10000)

In [2]: [df.sample(0.01).count() for i in range(0, 10)]
Out[2]: [96, 97, 95, 97, 105, 105, 105, 87, 95, 110]

The size of sampled data is floating near the target size 10000*0.01=100.
This relative deviation cannot be ignored, when the input dataset is large and the sampling fraction is small.

Does this PR introduce any user-facing change?

No

How was this patch tested?

CI and manually check

Was this patch authored or co-authored using generative AI tooling?

No

init

init

init
F.monotonically_increasing_id().alias(id_col_name),
)
.sort(rand_col_name)
.limit(max_rows + 1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sort + limit is likely be optimized to TakeOrderedAndProject which output single partition, this coalesce here is just used to guarantee the partitioning.

.sort(rand_col_name)
.limit(max_rows + 1)
.coalesce(1)
.sortWithinPartitions(id_col_name)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using local sorting to avoid unnecessary shuffle

@zhengruifeng
Copy link
Contributor Author

will send separate PR for the new dataframe plotting

@zhengruifeng zhengruifeng changed the title [SPARK-49640][PS] Apply Reservoir sampling in SampledPlotBase [SPARK-49640][PS] Apply reservoir sampling in SampledPlotBase Sep 13, 2024
@xinrong-meng
Copy link
Member

I’m wondering if this is considered a “user-facing change” since the sampling directly affects the appearance of the plot

@xinrong-meng
Copy link
Member

Thank you @zhengruifeng !

@zhengruifeng
Copy link
Contributor Author

I’m wondering if this is considered a “user-facing change” since the sampling directly affects the appearance of the plot

hmm, I don't think this is a “user-facing change” because:
1, it only take affect when number of rows > max_rows;
2, it is just another (better) implementation of uniform sampling;

@zhengruifeng
Copy link
Contributor Author

thanks, merged to master

@zhengruifeng zhengruifeng deleted the ps_sampling branch September 18, 2024 00:44
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
### What changes were proposed in this pull request?
Apply reservoir sampling in `SampledPlotBase`

### Why are the changes needed?
Existing sampling approach has two drawbacks:

1, it needs two jobs to sample `max_rows` rows:

- df.count() to compute `fraction = max_rows / count`
- df.sample(fraction).to_pandas() to do the sampling

2, the df.sample is based on Bernoulli sampling which **cannot** guarantee the sampled size == expected `max_rows`, e.g.
```
In [1]: df = spark.range(10000)

In [2]: [df.sample(0.01).count() for i in range(0, 10)]
Out[2]: [96, 97, 95, 97, 105, 105, 105, 87, 95, 110]
```
The size of sampled data is floating near the target size 10000*0.01=100.
This relative deviation cannot be ignored, when the input dataset is large and the sampling fraction is small.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI and manually check

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48105 from zhengruifeng/ps_sampling.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
### What changes were proposed in this pull request?
Apply reservoir sampling in `SampledPlotBase`

### Why are the changes needed?
Existing sampling approach has two drawbacks:

1, it needs two jobs to sample `max_rows` rows:

- df.count() to compute `fraction = max_rows / count`
- df.sample(fraction).to_pandas() to do the sampling

2, the df.sample is based on Bernoulli sampling which **cannot** guarantee the sampled size == expected `max_rows`, e.g.
```
In [1]: df = spark.range(10000)

In [2]: [df.sample(0.01).count() for i in range(0, 10)]
Out[2]: [96, 97, 95, 97, 105, 105, 105, 87, 95, 110]
```
The size of sampled data is floating near the target size 10000*0.01=100.
This relative deviation cannot be ignored, when the input dataset is large and the sampling fraction is small.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
CI and manually check

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48105 from zhengruifeng/ps_sampling.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants