Skip to content

Commit

Permalink
Cleanup unused code for metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Souvik Bose <souvbose@amazon.com>
  • Loading branch information
sbose2k21 committed Aug 20, 2024
1 parent 4bcde00 commit 93ababc
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@

@Getter
public class KinesisLeaseConfig {
@JsonProperty("lease_coordination_table")
private String leaseCoordinationTable;
@JsonProperty("lease_coordination")
private KinesisLeaseCoordinationTableConfig leaseCoordinationTable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.opensearch.dataprepper.model.plugin.ExtensionPlugin;
import org.opensearch.dataprepper.model.plugin.ExtensionPoints;

@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis_lease_config", allowInPipelineConfigurations = true)
@DataPrepperExtensionPlugin(modelType = KinesisLeaseConfig.class, rootKeyJsonPath = "/kinesis", allowInPipelineConfigurations = true)
public class KinesisLeaseConfigExtension implements ExtensionPlugin {

private KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.opensearch.dataprepper.plugins.kinesis.extension;

import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Getter;
import lombok.NonNull;
import software.amazon.awssdk.regions.Region;

@Getter
public class KinesisLeaseCoordinationTableConfig {

@JsonProperty("table_name")
@NonNull
private String tableName;

@JsonProperty("region")
@NonNull
private String region;

public Region getAwsRegion() {
return Region.of(region);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
Expand All @@ -24,10 +26,10 @@ public ClientFactory(final AwsCredentialsSupplier awsCredentialsSupplier,
this.awsAuthenticationConfig = awsAuthenticationConfig;
}

public DynamoDbAsyncClient buildDynamoDBClient() {
public DynamoDbAsyncClient buildDynamoDBClient(Region region) {
return DynamoDbAsyncClient.builder()
.credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration())
.region(awsAuthenticationConfig.getAwsRegion())
.credentialsProvider(DefaultCredentialsProvider.create())
.region(region)
.build();
}

Expand All @@ -39,10 +41,10 @@ public KinesisAsyncClient buildKinesisAsyncClient() {
);
}

public CloudWatchAsyncClient buildCloudWatchAsyncClient() {
public CloudWatchAsyncClient buildCloudWatchAsyncClient(Region region) {
return CloudWatchAsyncClient.builder()
.credentialsProvider(awsAuthenticationConfig.authenticateAwsConfiguration())
.region(awsAuthenticationConfig.getAwsRegion())
.credentialsProvider(DefaultCredentialsProvider.create())
.region(region)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
Expand Down Expand Up @@ -39,6 +40,7 @@ public class KinesisService {

private final String applicationName;
private final String tableName;
private final String kclMetricsNamespaceName;
private final String pipelineName;
private final AcknowledgementSetManager acknowledgementSetManager;
private final KinesisSourceConfig sourceConfig;
Expand All @@ -63,15 +65,18 @@ public KinesisService(final KinesisSourceConfig sourceConfig,
this.pluginMetrics = pluginMetrics;
this.pluginFactory = pluginFactory;
this.acknowledgementSetManager = acknowledgementSetManager;
this.dynamoDbClient = clientFactory.buildDynamoDBClient();
this.kinesisClient = clientFactory.buildKinesisAsyncClient();
this.cloudWatchClient = clientFactory.buildCloudWatchAsyncClient();
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
if (kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().isEmpty()) {
throw new IllegalStateException("Lease Coordination table should be provided!");
}
this.tableName = kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable();
KinesisLeaseConfig kinesisLeaseConfig =
kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get();
this.tableName = kinesisLeaseConfig.getLeaseCoordinationTable().getTableName();
this.kclMetricsNamespaceName = this.tableName;
this.dynamoDbClient = clientFactory.buildDynamoDBClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.kinesisClient = clientFactory.buildKinesisAsyncClient();
this.cloudWatchClient = clientFactory.buildCloudWatchAsyncClient(kinesisLeaseConfig.getLeaseCoordinationTable().getAwsRegion());
this.pipelineName = pipelineDescription.getPipelineName();
this.applicationName = pipelineName;
this.executorService = Executors.newFixedThreadPool(1);
}

Expand Down Expand Up @@ -118,7 +123,9 @@ public Scheduler createScheduler(final Buffer<Record<Event>> buffer) {
new KinesisMultiStreamTracker(kinesisClient, sourceConfig, applicationName),
applicationName, kinesisClient, dynamoDbClient, cloudWatchClient,
new WorkerIdentifierGenerator().generate(), processorFactory
).tableName(tableName);
)
.tableName(tableName)
.namespace(kclMetricsNamespaceName);

ConsumerStrategy consumerStrategy = sourceConfig.getConsumerStrategy();
if (consumerStrategy == ConsumerStrategy.POLLING) {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@ public class DefaultKinesisLeaseConfigSupplierTest {
@Mock
KinesisLeaseConfig kinesisLeaseConfig;

@Mock
KinesisLeaseCoordinationTableConfig kinesisLeaseCoordinationTableConfig;

private DefaultKinesisLeaseConfigSupplier createObjectUnderTest() {
return new DefaultKinesisLeaseConfigSupplier(kinesisLeaseConfig);
}

@Test
void testGetters() {
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(LEASE_COORDINATION_TABLE);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn(LEASE_COORDINATION_TABLE);
when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1");
KinesisLeaseConfigSupplier kinesisLeaseConfigSupplier = createObjectUnderTest();
assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable(), equalTo(LEASE_COORDINATION_TABLE));
assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable().getTableName(), equalTo(LEASE_COORDINATION_TABLE));
assertThat(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig().get().getLeaseCoordinationTable().getRegion(), equalTo("us-east-1"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private KinesisLeaseConfig makeConfig(String filePath) throws IOException {
assertThat(dataPrepperConfiguration, notNullValue());
assertThat(dataPrepperConfiguration.getPipelineExtensions(), notNullValue());
final Map<String, Object> kinesisLeaseConfigMap =
(Map<String, Object>) dataPrepperConfiguration.getPipelineExtensions().getExtensionMap().get("kinesis_lease_config");
(Map<String, Object>) dataPrepperConfiguration.getPipelineExtensions().getExtensionMap().get("kinesis");
String json = OBJECT_MAPPER.writeValueAsString(kinesisLeaseConfigMap);
Reader reader = new StringReader(json);
return OBJECT_MAPPER.readValue(reader, KinesisLeaseConfig.class);
Expand All @@ -46,7 +46,8 @@ void testConfigWithTestExtension() throws IOException {
"src/test/resources/simple_pipeline_with_extensions.yaml");

assertNotNull(kinesisLeaseConfig.getLeaseCoordinationTable());
assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable(), "kinesis-pipeline-kcl");
assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getTableName(), "kinesis-pipeline-kcl");
assertEquals(kinesisLeaseConfig.getLeaseCoordinationTable().getRegion(), "us-east-1");

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ void testCreateClient() throws NoSuchFieldException, IllegalAccessException {

ClientFactory clientFactory = new ClientFactory(awsCredentialsSupplier, awsAuthenticationOptionsConfig);

final DynamoDbAsyncClient dynamoDbAsyncClient = clientFactory.buildDynamoDBClient();
final DynamoDbAsyncClient dynamoDbAsyncClient = clientFactory.buildDynamoDBClient(Region.US_EAST_1);
assertNotNull(dynamoDbAsyncClient);

final KinesisAsyncClient kinesisAsyncClient = clientFactory.buildKinesisAsyncClient();
assertNotNull(kinesisAsyncClient);

final CloudWatchAsyncClient cloudWatchAsyncClient = clientFactory.buildCloudWatchAsyncClient();
final CloudWatchAsyncClient cloudWatchAsyncClient = clientFactory.buildCloudWatchAsyncClient(Region.US_EAST_1);
assertNotNull(cloudWatchAsyncClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseCoordinationTableConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.ConsumerStrategy;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
Expand Down Expand Up @@ -107,6 +108,9 @@ public class KinesisServiceTest {
@Mock
KinesisLeaseConfig kinesisLeaseConfig;

@Mock
KinesisLeaseCoordinationTableConfig kinesisLeaseCoordinationTableConfig;

@BeforeEach
void setup() {
awsAuthenticationConfig = mock(AwsAuthenticationConfig.class);
Expand All @@ -122,7 +126,11 @@ void setup() {
buffer = mock(Buffer.class);
kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class);
kinesisLeaseConfig = mock(KinesisLeaseConfig.class);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn("kinesis-lease-table");
kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("kinesis-lease-table");
when(kinesisLeaseCoordinationTableConfig.getRegion()).thenReturn("us-east-1");
when(kinesisLeaseCoordinationTableConfig.getAwsRegion()).thenReturn(Region.US_EAST_1);
when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.ofNullable(kinesisLeaseConfig));

when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of("us-west-2"));
Expand Down Expand Up @@ -160,9 +168,9 @@ void setup() {
when(kinesisSourceConfig.getStreams()).thenReturn(streamConfigs);
when(kinesisSourceConfig.getNumberOfRecordsToAccumulate()).thenReturn(NUMBER_OF_RECORDS_TO_ACCUMULATE);

when(clientFactory.buildDynamoDBClient()).thenReturn(dynamoDbClient);
when(clientFactory.buildDynamoDBClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(dynamoDbClient);
when(clientFactory.buildKinesisAsyncClient()).thenReturn(kinesisClient);
when(clientFactory.buildCloudWatchAsyncClient()).thenReturn(cloudWatchClient);
when(clientFactory.buildCloudWatchAsyncClient(kinesisLeaseCoordinationTableConfig.getAwsRegion())).thenReturn(cloudWatchClient);
when(kinesisClient.serviceClientConfiguration()).thenReturn(KinesisServiceClientConfiguration.builder().region(Region.US_EAST_1).build());
when(scheduler.startGracefulShutdown()).thenReturn(CompletableFuture.completedFuture(true));
when(pipelineDescription.getPipelineName()).thenReturn(PIPELINE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfig;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseConfigSupplier;
import org.opensearch.dataprepper.plugins.kinesis.extension.KinesisLeaseCoordinationTableConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.AwsAuthenticationConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisSourceConfig;
import org.opensearch.dataprepper.plugins.kinesis.source.configuration.KinesisStreamConfig;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class KinesisSourceTest {
@Mock
KinesisLeaseConfig kinesisLeaseConfig;

@Mock
KinesisLeaseCoordinationTableConfig kinesisLeaseCoordinationTableConfig;

@BeforeEach
void setup() {
pluginMetrics = mock(PluginMetrics.class);
Expand All @@ -79,7 +83,9 @@ void setup() {
kinesisService = mock(KinesisService.class);
kinesisLeaseConfigSupplier = mock(KinesisLeaseConfigSupplier.class);
kinesisLeaseConfig = mock(KinesisLeaseConfig.class);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn("kinesis-lease-table");
kinesisLeaseCoordinationTableConfig = mock(KinesisLeaseCoordinationTableConfig.class);
when(kinesisLeaseConfig.getLeaseCoordinationTable()).thenReturn(kinesisLeaseCoordinationTableConfig);
when(kinesisLeaseCoordinationTableConfig.getTableName()).thenReturn("us-east-1");
when(kinesisLeaseConfigSupplier.getKinesisExtensionLeaseConfig()).thenReturn(Optional.ofNullable(kinesisLeaseConfig));
when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of("us-west-2"));
when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
extensions:
kinesis_lease_config:
lease_coordination_table: "kinesis-pipeline-kcl"
kinesis:
lease_coordination:
table_name: "kinesis-pipeline-kcl"
region: "us-east-1"

0 comments on commit 93ababc

Please sign in to comment.