Skip to content

Commit

Permalink
Add BigTableIO Stress test (apache#30630)
Browse files Browse the repository at this point in the history
* Add BigTableIO Stress test

* refactor

* update dependency tree

* refactor

* Add stress test files to grpc/protobuff exception ignore list

* move exportMetrics method to IOLoadTestBase class

* refactor

* refactor
  • Loading branch information
akashorabek authored Mar 21, 2024
1 parent 4a3b6c5 commit 8955124
Show file tree
Hide file tree
Showing 8 changed files with 578 additions and 239 deletions.
5 changes: 4 additions & 1 deletion it/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies {
implementation project(path: ":runners:google-cloud-dataflow-java")
implementation project(path: ":it:conditions", configuration: "shadow")
implementation project(path: ":it:truthmatchers", configuration: "shadow")
implementation project(path: ":sdks:java:testing:test-utils")
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.jackson_core
Expand All @@ -48,6 +49,7 @@ dependencies {
implementation library.java.protobuf_java
implementation library.java.threetenbp
implementation 'org.awaitility:awaitility:4.2.0'
implementation 'joda-time:joda-time:2.10.10'
// Google Cloud Dependencies
implementation library.java.google_api_services_bigquery
implementation library.java.google_cloud_core
Expand All @@ -71,7 +73,6 @@ dependencies {
implementation 'com.google.cloud:google-cloud-secretmanager'
provided 'com.google.api.grpc:proto-google-cloud-secretmanager-v1'

testImplementation project(path: ":sdks:java:testing:test-utils")
testImplementation project(path: ":sdks:java:io:google-cloud-platform")
testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:io:synthetic")
Expand All @@ -83,6 +84,8 @@ dependencies {

tasks.register("GCSPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'FileBasedIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigTablePerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOLT', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigTableStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigTableStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigTableIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryPerformanceTest", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOLT', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryStressTestMedium", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'medium','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
tasks.register("BigQueryStressTestLarge", IoPerformanceTestUtilities.IoPerformanceTest, project, 'google-cloud-platform', 'BigQueryIOST', ['configuration':'large','project':'apache-beam-testing', 'artifactBucket':'io-performance-temp'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@
*/
package org.apache.beam.it.gcp;

import com.google.cloud.Timestamp;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.gcp.dataflow.DefaultPipelineLauncher;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -101,4 +110,29 @@ public enum PipelineMetricsType {
public static String getBeamMetricsName(PipelineMetricsType metricstype, String metricsName) {
return BEAM_METRICS_NAMESPACE + ":" + metricstype + ":" + metricsName;
}

/** Exports test metrics to InfluxDB or BigQuery depending on the configuration. */
protected void exportMetrics(
PipelineLauncher.LaunchInfo launchInfo,
MetricsConfiguration metricsConfig,
boolean exportToInfluxDB,
InfluxDBSettings influxDBSettings)
throws IOException, ParseException, InterruptedException {

Map<String, Double> metrics = getMetrics(launchInfo, metricsConfig);
String testId = UUID.randomUUID().toString();
String testTimestamp = Timestamp.now().toString();

if (exportToInfluxDB) {
Collection<NamedTestResult> namedTestResults = new ArrayList<>();
for (Map.Entry<String, Double> entry : metrics.entrySet()) {
NamedTestResult metricResult =
NamedTestResult.create(testId, testTimestamp, entry.getKey(), entry.getValue());
namedTestResults.add(metricResult);
}
IOITMetrics.publishToInflux(testId, testTimestamp, namedTestResults, influxDBSettings);
} else {
exportMetricsToBigQuery(launchInfo, metrics);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.it.gcp;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
import org.joda.time.Instant;

/** Base class for IO Stress tests. */
public class IOStressTestBase extends IOLoadTestBase {
/**
* The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
* eventually return to 1x.
*/
protected static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};

protected static final int DEFAULT_ROWS_PER_SECOND = 1000;

/**
* Generates and returns a list of LoadPeriod instances representing periods of load increase
* based on the specified load increase array and total duration in minutes.
*
* @param minutesTotal The total duration in minutes for which the load periods are generated.
* @return A list of LoadPeriod instances defining periods of load increase.
*/
protected List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {

List<LoadPeriod> loadPeriods = new ArrayList<>();
long periodDurationMillis =
Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
long startTimeMillis = 0;

for (int loadIncreaseMultiplier : loadIncreaseArray) {
long endTimeMillis = startTimeMillis + periodDurationMillis;
loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));

startTimeMillis = endTimeMillis;
}
return loadPeriods;
}

/**
* Represents a period of time with associated load increase properties for stress testing
* scenarios.
*/
protected static class LoadPeriod implements Serializable {
private final int loadIncreaseMultiplier;
private final long periodStartMillis;
private final long periodEndMillis;

public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
this.loadIncreaseMultiplier = loadIncreaseMultiplier;
this.periodStartMillis = periodStartMillis;
this.periodEndMillis = periodEndMin;
}

public int getLoadIncreaseMultiplier() {
return loadIncreaseMultiplier;
}

public long getPeriodStartMillis() {
return periodStartMillis;
}

public long getPeriodEndMillis() {
return periodEndMillis;
}
}

/**
* Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
* load increase over time, multiplying the input elements based on the elapsed time since the
* start of processing. This class aims to simulate various load levels during stress testing.
*/
protected static class MultiplierDoFn<T> extends DoFn<T, T> {
private final int startMultiplier;
private final long startTimesMillis;
private final List<LoadPeriod> loadPeriods;

public MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
this.startMultiplier = startMultiplier;
this.startTimesMillis = Instant.now().getMillis();
this.loadPeriods = loadPeriods;
}

@DoFn.ProcessElement
public void processElement(
@Element T element, OutputReceiver<T> outputReceiver, @DoFn.Timestamp Instant timestamp) {

int multiplier = this.startMultiplier;
long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;

for (LoadPeriod loadPeriod : loadPeriods) {
if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
&& elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
multiplier *= loadPeriod.getLoadIncreaseMultiplier();
break;
}
}
for (int i = 0; i < multiplier; i++) {
outputReceiver.output(element);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.Timestamp;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.time.Duration;
Expand All @@ -46,7 +45,7 @@
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.IOLoadTestBase;
import org.apache.beam.it.gcp.IOStressTestBase;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
Expand All @@ -59,7 +58,6 @@
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.PeriodicImpulse;
Expand All @@ -70,7 +68,6 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -87,18 +84,11 @@
* - To run large-scale stress tests: {@code gradle
* :it:google-cloud-platform:BigQueryStressTestLarge}
*/
public final class BigQueryIOST extends IOLoadTestBase {
public final class BigQueryIOST extends IOStressTestBase {

private static final String READ_ELEMENT_METRIC_NAME = "read_count";
private static final String TEST_ID = UUID.randomUUID().toString();
private static final String TEST_TIMESTAMP = Timestamp.now().toString();
private static final int DEFAULT_ROWS_PER_SECOND = 1000;

/**
* The load will initiate at 1x, progressively increase to 2x and 4x, then decrease to 2x and
* eventually return to 1x.
*/
private static final int[] DEFAULT_LOAD_INCREASE_ARRAY = {1, 2, 2, 4, 2, 1};

private static BigQueryResourceManager resourceManager;
private static String tableQualifier;
Expand Down Expand Up @@ -301,7 +291,7 @@ private void generateDataAndWrite(BigQueryIO.Write<byte[]> writeIO) throws IOExc
source
.apply(
"One input to multiple outputs",
ParDo.of(new MultiplierDoFn(startMultiplier, loadPeriods)))
ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
.apply("Reshuffle fanout", Reshuffle.viaRandomKey())
.apply("Counting element", ParDo.of(new CountingFn<>(READ_ELEMENT_METRIC_NAME)));
}
Expand Down Expand Up @@ -371,44 +361,6 @@ private void generateDataAndWrite(BigQueryIO.Write<byte[]> writeIO) throws IOExc
}
}

/**
* Custom Apache Beam DoFn designed for use in stress testing scenarios. It introduces a dynamic
* load increase over time, multiplying the input elements based on the elapsed time since the
* start of processing. This class aims to simulate various load levels during stress testing.
*/
private static class MultiplierDoFn extends DoFn<byte[], byte[]> {
private final int startMultiplier;
private final long startTimesMillis;
private final List<LoadPeriod> loadPeriods;

MultiplierDoFn(int startMultiplier, List<LoadPeriod> loadPeriods) {
this.startMultiplier = startMultiplier;
this.startTimesMillis = Instant.now().getMillis();
this.loadPeriods = loadPeriods;
}

@ProcessElement
public void processElement(
@Element byte[] element,
OutputReceiver<byte[]> outputReceiver,
@DoFn.Timestamp Instant timestamp) {

int multiplier = this.startMultiplier;
long elapsedTimeMillis = timestamp.getMillis() - startTimesMillis;

for (LoadPeriod loadPeriod : loadPeriods) {
if (elapsedTimeMillis >= loadPeriod.getPeriodStartMillis()
&& elapsedTimeMillis < loadPeriod.getPeriodEndMillis()) {
multiplier *= loadPeriod.getLoadIncreaseMultiplier();
break;
}
}
for (int i = 0; i < multiplier; i++) {
outputReceiver.output(element);
}
}
}

abstract static class FormatFn<InputT, OutputT> implements SerializableFunction<InputT, OutputT> {
protected final int numColumns;

Expand Down Expand Up @@ -493,29 +445,6 @@ public TableRow apply(byte[] input) {
}
}

/**
* Generates and returns a list of LoadPeriod instances representing periods of load increase
* based on the specified load increase array and total duration in minutes.
*
* @param minutesTotal The total duration in minutes for which the load periods are generated.
* @return A list of LoadPeriod instances defining periods of load increase.
*/
private List<LoadPeriod> getLoadPeriods(int minutesTotal, int[] loadIncreaseArray) {

List<LoadPeriod> loadPeriods = new ArrayList<>();
long periodDurationMillis =
Duration.ofMinutes(minutesTotal / loadIncreaseArray.length).toMillis();
long startTimeMillis = 0;

for (int loadIncreaseMultiplier : loadIncreaseArray) {
long endTimeMillis = startTimeMillis + periodDurationMillis;
loadPeriods.add(new LoadPeriod(loadIncreaseMultiplier, startTimeMillis, endTimeMillis));

startTimeMillis = endTimeMillis;
}
return loadPeriods;
}

private enum WriteFormat {
AVRO,
JSON
Expand Down Expand Up @@ -573,32 +502,4 @@ static class Configuration extends SyntheticSourceOptions {
/** InfluxDB database to publish metrics. * */
@JsonProperty public String influxDatabase;
}

/**
* Represents a period of time with associated load increase properties for stress testing
* scenarios.
*/
private static class LoadPeriod implements Serializable {
private final int loadIncreaseMultiplier;
private final long periodStartMillis;
private final long periodEndMillis;

public LoadPeriod(int loadIncreaseMultiplier, long periodStartMillis, long periodEndMin) {
this.loadIncreaseMultiplier = loadIncreaseMultiplier;
this.periodStartMillis = periodStartMillis;
this.periodEndMillis = periodEndMin;
}

public int getLoadIncreaseMultiplier() {
return loadIncreaseMultiplier;
}

public long getPeriodStartMillis() {
return periodStartMillis;
}

public long getPeriodEndMillis() {
return periodEndMillis;
}
}
}
Loading

0 comments on commit 8955124

Please sign in to comment.