Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initializing the CosmosAsyncContainer prior to running the test #20321

Merged
merged 2 commits into from
Apr 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class DataLoader {

private static final int MAX_BATCH_SIZE = 10000;
private static final int BULK_OPERATION_CONCURRENCY = 10;
private static final Duration BATCH_DATA_LOAD_WAIT_DURATION = Duration.ofMinutes(5);
private static final Duration VALIDATE_DATA_WAIT_DURATION = Duration.ofSeconds(120);
private static final String COUNT_ALL_QUERY = "SELECT COUNT(1) FROM c";
private static final String COUNT_ALL_QUERY_RESULT_FIELD = "$1";
Expand Down Expand Up @@ -91,13 +92,11 @@ private void bulkCreateItems(final CosmosAsyncDatabase database,
container.getId());

// We want to wait longer depending on the number of documents in each iteration
final Duration blockingWaitTime = Duration.ofSeconds(120 *
(((_configuration.getBulkloadBatchSize() - 1) / 200000) + 1));
final BulkProcessingOptions<Object> bulkProcessingOptions = new BulkProcessingOptions<>(Object.class);
bulkProcessingOptions.setMaxMicroBatchSize(MAX_BATCH_SIZE)
.setMaxMicroBatchConcurrency(BULK_OPERATION_CONCURRENCY);
container.processBulkOperations(Flux.fromIterable(cosmosItemOperations), bulkProcessingOptions)
.blockLast(blockingWaitTime);
.blockLast(BATCH_DATA_LOAD_WAIT_DURATION);

LOGGER.info("Completed loading {} documents into [{}:{}]", cosmosItemOperations.size(),
database.getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void setup() throws CosmosException {

LOGGER.info("Data loading completed");
_bulkLoadClient.close();

_testRunner.init();
}

public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,13 @@ public abstract class TestRunner {
_semaphore = new Semaphore(configuration.getConcurrency());
}

public void init() {
LOGGER.info("Initializing the TestRunner");
_accessor.initialize();
}

public void run() {
LOGGER.info("Executing Tests for the Scenario");
LOGGER.info("Executing Tests for the configured Scenario");
KeyGenerator keyGenerator = getNewKeyGenerator();
final long runStartTime = System.currentTimeMillis();
long i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
*/
public interface Accessor<K, V> {

/**
* Accessor initialization operations
*/
void initialize();

/**
* Retrieves the entity from the data source, using the key and request options provided. The entire
* key must be defined, and partial keys will NOT be accepted for GET. Use BatchGet for retrieving entities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,21 @@

package com.azure.cosmos.benchmark.linkedin.impl;

import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.benchmark.linkedin.impl.exceptions.CosmosDBDataAccessorException;
import com.azure.cosmos.benchmark.linkedin.impl.keyextractor.KeyExtractor;
import com.azure.cosmos.benchmark.linkedin.impl.metrics.MetricsFactory;
import com.azure.cosmos.benchmark.linkedin.impl.models.BatchGetResult;
import com.azure.cosmos.benchmark.linkedin.impl.models.CollectionKey;
import com.azure.cosmos.benchmark.linkedin.impl.models.GetRequestOptions;
import com.azure.cosmos.benchmark.linkedin.impl.models.QueryOptions;
import com.azure.cosmos.benchmark.linkedin.impl.models.Result;
import com.google.common.base.Preconditions;
import java.time.Clock;
import java.time.Duration;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -24,6 +29,9 @@
@ThreadSafe
public class CosmosDBDataAccessor<K, V> implements Accessor<K, V> {

private static final Logger LOGGER = LoggerFactory.getLogger(CosmosDBDataAccessor.class);
private static final Duration INITIALIZATION_WAIT_TIME = Duration.ofSeconds(120);

private final DataLocator _dataLocator;
private final KeyExtractor<K> _keyExtractor;
private final ResponseHandler<K, V> _responseHandler;
Expand All @@ -42,6 +50,20 @@ public CosmosDBDataAccessor(final DataLocator dataLocator, final KeyExtractor<K>
_queryExecutor = new QueryExecutor<>(_dataLocator, _responseHandler, metricsFactory, _clock, logger);
}

@Override
public void initialize() {
final CollectionKey collection = _dataLocator.getCollection();
final CosmosAsyncContainer asyncContainer = _dataLocator.getAsyncContainer(collection);
try {
LOGGER.info("Performing connection initialization for collection {}", collection);
asyncContainer.openConnectionsAndInitCaches()
.block(INITIALIZATION_WAIT_TIME);
} catch (Exception ex) {
LOGGER.error(String.format("Exception during connection initialization on the Collection %s", collection),
ex);
}
}

@Override
public Result<K, V> get(final K key, final GetRequestOptions requestOptions) throws CosmosDBDataAccessorException {
Preconditions.checkNotNull(key, "The key to fetch the Entity is null!");
Expand Down