Skip to content

Commit

Permalink
Initializing the CosmosAsyncContainer prior to running the test (#20321)
Browse files Browse the repository at this point in the history
* Initializing the CosmosAsyncContainer prior to running the test

* Changing the batch data load wait time to 5 minutes

Co-authored-by: Amar Athavale <aathaval@linkedin.biz>
  • Loading branch information
amarathavale and Amar Athavale authored Apr 1, 2021
1 parent cf9399d commit 5703dc2
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class DataLoader {

private static final int MAX_BATCH_SIZE = 10000;
private static final int BULK_OPERATION_CONCURRENCY = 10;
private static final Duration BATCH_DATA_LOAD_WAIT_DURATION = Duration.ofMinutes(5);
private static final Duration VALIDATE_DATA_WAIT_DURATION = Duration.ofSeconds(120);
private static final String COUNT_ALL_QUERY = "SELECT COUNT(1) FROM c";
private static final String COUNT_ALL_QUERY_RESULT_FIELD = "$1";
Expand Down Expand Up @@ -91,13 +92,11 @@ private void bulkCreateItems(final CosmosAsyncDatabase database,
container.getId());

// We want to wait longer depending on the number of documents in each iteration
final Duration blockingWaitTime = Duration.ofSeconds(120 *
(((_configuration.getBulkloadBatchSize() - 1) / 200000) + 1));
final BulkProcessingOptions<Object> bulkProcessingOptions = new BulkProcessingOptions<>(Object.class);
bulkProcessingOptions.setMaxMicroBatchSize(MAX_BATCH_SIZE)
.setMaxMicroBatchConcurrency(BULK_OPERATION_CONCURRENCY);
container.processBulkOperations(Flux.fromIterable(cosmosItemOperations), bulkProcessingOptions)
.blockLast(blockingWaitTime);
.blockLast(BATCH_DATA_LOAD_WAIT_DURATION);

LOGGER.info("Completed loading {} documents into [{}:{}]", cosmosItemOperations.size(),
database.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void setup() throws CosmosException {

LOGGER.info("Data loading completed");
_bulkLoadClient.close();

_testRunner.init();
}

public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ public abstract class TestRunner {
_semaphore = new Semaphore(configuration.getConcurrency());
}

public void init() {
LOGGER.info("Initializing the TestRunner");
_accessor.initialize();
}

public void run() {
LOGGER.info("Executing Tests for the Scenario");
LOGGER.info("Executing Tests for the configured Scenario");
KeyGenerator keyGenerator = getNewKeyGenerator();
final long runStartTime = System.currentTimeMillis();
long i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
public interface Accessor<K, V> {

/**
* Accessor initialization operations
*/
void initialize();

/**
* Retrieves the entity from the data source, using the key and request options provided. The entire
* key must be defined, and partial keys will NOT be accepted for GET. Use BatchGet for retrieving entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@

package com.azure.cosmos.benchmark.linkedin.impl;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.benchmark.linkedin.impl.exceptions.CosmosDBDataAccessorException;
import com.azure.cosmos.benchmark.linkedin.impl.keyextractor.KeyExtractor;
import com.azure.cosmos.benchmark.linkedin.impl.metrics.MetricsFactory;
import com.azure.cosmos.benchmark.linkedin.impl.models.BatchGetResult;
import com.azure.cosmos.benchmark.linkedin.impl.models.CollectionKey;
import com.azure.cosmos.benchmark.linkedin.impl.models.GetRequestOptions;
import com.azure.cosmos.benchmark.linkedin.impl.models.QueryOptions;
import com.azure.cosmos.benchmark.linkedin.impl.models.Result;
import com.google.common.base.Preconditions;
import java.time.Clock;
import java.time.Duration;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -24,6 +29,9 @@
@ThreadSafe
public class CosmosDBDataAccessor<K, V> implements Accessor<K, V> {

private static final Logger LOGGER = LoggerFactory.getLogger(CosmosDBDataAccessor.class);
private static final Duration INITIALIZATION_WAIT_TIME = Duration.ofSeconds(120);

private final DataLocator _dataLocator;
private final KeyExtractor<K> _keyExtractor;
private final ResponseHandler<K, V> _responseHandler;
Expand All @@ -42,6 +50,20 @@ public CosmosDBDataAccessor(final DataLocator dataLocator, final KeyExtractor<K>
_queryExecutor = new QueryExecutor<>(_dataLocator, _responseHandler, metricsFactory, _clock, logger);
}

@Override
public void initialize() {
final CollectionKey collection = _dataLocator.getCollection();
final CosmosAsyncContainer asyncContainer = _dataLocator.getAsyncContainer(collection);
try {
LOGGER.info("Performing connection initialization for collection {}", collection);
asyncContainer.openConnectionsAndInitCaches()
.block(INITIALIZATION_WAIT_TIME);
} catch (Exception ex) {
LOGGER.error(String.format("Exception during connection initialization on the Collection %s", collection),
ex);
}
}

@Override
public Result<K, V> get(final K key, final GetRequestOptions requestOptions) throws CosmosDBDataAccessorException {
Preconditions.checkNotNull(key, "The key to fetch the Entity is null!");
Expand Down

0 comments on commit 5703dc2

Please sign in to comment.