Skip to content

Commit

Permalink
FlintStreamingJobCleanerTask Implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Mar 18, 2024
1 parent 0838843 commit 3e4c0e9
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public enum MetricName {
EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT("emr_cancel_job_request_failure_count"),
EMR_STREAMING_QUERY_JOBS_CREATION_COUNT("emr_streaming_jobs_creation_count"),
EMR_INTERACTIVE_QUERY_JOBS_CREATION_COUNT("emr_interactive_jobs_creation_count"),
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count");
EMR_BATCH_QUERY_JOBS_CREATION_COUNT("emr_batch_jobs_creation_count"),
STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT("streaming_job_cleaner_task_failure_count");

private String name;

Expand Down Expand Up @@ -91,6 +92,7 @@ public static List<String> getNames() {
.add(ASYNC_QUERY_CREATE_API_REQUEST_COUNT)
.add(ASYNC_QUERY_GET_API_REQUEST_COUNT)
.add(ASYNC_QUERY_CANCEL_API_REQUEST_COUNT)
.add(STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.build();

public boolean isNumerical() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.sql.prometheus.storage.PrometheusStorageFactory;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.cluster.ClusterManagerEventListener;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.rest.RestAsyncQueryManagementAction;
import org.opensearch.sql.spark.storage.SparkStorageFactory;
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
Expand Down Expand Up @@ -221,7 +224,11 @@ public Collection<Object> createComponents(
OpenSearchSettings.SESSION_INDEX_TTL_SETTING,
OpenSearchSettings.RESULT_INDEX_TTL_SETTING,
OpenSearchSettings.AUTO_INDEX_MANAGEMENT_ENABLED_SETTING,
environment.settings());
environment.settings(),
dataSourceService,
injector.getInstance(FlintIndexMetadataService.class),
injector.getInstance(StateStore.class),
injector.getInstance(EMRServerlessClientFactory.class));
return ImmutableList.of(
dataSourceService,
injector.getInstance(AsyncQueryExecutorService.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;

public class ClusterManagerEventListener implements LocalNodeClusterManagerListener {

private Cancellable flintIndexRetentionCron;
private Cancellable flintStreamingJobCleanerCron;
private ClusterService clusterService;
private ThreadPool threadPool;
private Client client;
private Clock clock;
private DataSourceService dataSourceService;
private FlintIndexMetadataService flintIndexMetadataService;
private StateStore stateStore;
private EMRServerlessClientFactory emrServerlessClientFactory;
private Duration sessionTtlDuration;
private Duration resultTtlDuration;
private boolean isAutoIndexManagementEnabled;
Expand All @@ -42,13 +51,20 @@ public ClusterManagerEventListener(
Setting<TimeValue> sessionTtl,
Setting<TimeValue> resultTtl,
Setting<Boolean> isAutoIndexManagementEnabledSetting,
Settings settings) {
Settings settings,
DataSourceService dataSourceService,
FlintIndexMetadataService flintIndexMetadataService,
StateStore stateStore,
EMRServerlessClientFactory emrServerlessClientFactory) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.client = client;
this.clusterService.addLocalNodeClusterManagerListener(this);
this.clock = clock;

this.dataSourceService = dataSourceService;
this.flintIndexMetadataService = flintIndexMetadataService;
this.stateStore = stateStore;
this.emrServerlessClientFactory = emrServerlessClientFactory;
this.sessionTtlDuration = toDuration(sessionTtl.get(settings));
this.resultTtlDuration = toDuration(resultTtl.get(settings));

Expand Down Expand Up @@ -104,6 +120,19 @@ public void beforeStop() {
}
});
}
initializeStreamingJobCleanerCron();
}

private void initializeStreamingJobCleanerCron() {
flintStreamingJobCleanerCron =
threadPool.scheduleWithFixedDelay(
new FlintStreamingJobCleanerTask(
dataSourceService,
flintIndexMetadataService,
stateStore,
emrServerlessClientFactory),
TimeValue.timeValueMinutes(15),
executorName());
}

private void reInitializeFlintIndexRetention() {
Expand All @@ -125,6 +154,8 @@ private void reInitializeFlintIndexRetention() {
public void offClusterManager() {
cancel(flintIndexRetentionCron);
flintIndexRetentionCron = null;
cancel(flintStreamingJobCleanerCron);
flintStreamingJobCleanerCron = null;
}

private void cancel(Cancellable cron) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.cluster;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasource.model.DataSourceStatus;
import org.opensearch.sql.datasource.model.DataSourceType;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.flint.FlintIndexMetadata;
import org.opensearch.sql.spark.flint.FlintIndexMetadataService;
import org.opensearch.sql.spark.flint.operation.FlintIndexOpAlter;

/** Cleaner task which alters the active streaming jobs of a disabled datasource. */
@RequiredArgsConstructor
public class FlintStreamingJobCleanerTask implements Runnable {

private final DataSourceService dataSourceService;
private final FlintIndexMetadataService flintIndexMetadataService;
private final StateStore stateStore;
private final EMRServerlessClientFactory emrServerlessClientFactory;

private static final Logger LOGGER = LogManager.getLogger(FlintStreamingJobCleanerTask.class);
private static final AtomicBoolean isRunning = new AtomicBoolean(false);

@Override
public void run() {
if (!isRunning.compareAndSet(false, true)) {
LOGGER.info("Previous task is still running. Skipping this execution.");
return;
}
try {
LOGGER.info("Starting the cleaner task for disabled data sources.");
List<DataSourceMetadata> s3GlueDisabledDataSources = getS3GlueDisabledDataSources();
LOGGER.info("Found {} disabled data sources to process.", s3GlueDisabledDataSources.size());
for (DataSourceMetadata dataSourceMetadata : s3GlueDisabledDataSources) {
LOGGER.info("Processing disabled data source: {}", dataSourceMetadata.getName());
Map<String, FlintIndexMetadata> autoRefreshFlintIndicesMap =
getAutoRefreshIndicesOfDataSource(dataSourceMetadata);
LOGGER.info(
"Found {} auto-refresh indices to alter for data source: {}",
autoRefreshFlintIndicesMap.size(),
dataSourceMetadata.getName());
autoRefreshFlintIndicesMap.forEach(
(autoRefreshIndex, flintIndexMetadata) -> {
try {
LOGGER.debug("Attempting to alter index: {}", autoRefreshIndex);
FlintIndexOptions flintIndexOptions = new FlintIndexOptions();
flintIndexOptions.setOption(FlintIndexOptions.AUTO_REFRESH, "false");
FlintIndexOpAlter flintIndexOpAlter =
new FlintIndexOpAlter(
flintIndexOptions,
stateStore,
dataSourceMetadata.getName(),
emrServerlessClientFactory.getClient(),
flintIndexMetadataService);
flintIndexOpAlter.apply(flintIndexMetadata);
LOGGER.info("Successfully altered index: {}", autoRefreshIndex);
} catch (Exception exception) {
LOGGER.error(
"Failed to alter index {}: {}",
autoRefreshIndex,
exception.getMessage(),
exception);
Metrics.getInstance()
.getNumericalMetric(MetricName.STREAMING_JOB_CLEANER_TASK_FAILURE_COUNT)
.increment();
}
});
}
} catch (Throwable error) {
LOGGER.info("Error while running the streaming job cleaner task: {}", error.getMessage());
} finally {
isRunning.set(false);
}
}

private Map<String, FlintIndexMetadata> getAutoRefreshIndicesOfDataSource(
DataSourceMetadata dataSourceMetadata) {
Map<String, FlintIndexMetadata> flintIndexMetadataHashMap =
flintIndexMetadataService.getFlintIndexMetadata(
"flint_" + dataSourceMetadata.getName() + "_*");
return flintIndexMetadataHashMap.entrySet().stream()
.filter(entry -> entry.getValue().getFlintIndexOptions().autoRefresh())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private List<DataSourceMetadata> getS3GlueDisabledDataSources() {
return this.dataSourceService.getDataSourceMetadata(false).stream()
.filter(dataSourceMetadata -> dataSourceMetadata.getConnector() == DataSourceType.S3GLUE)
.filter(dataSourceMetadata -> dataSourceMetadata.getStatus() == DataSourceStatus.DISABLED)
.collect(Collectors.toList());
}
}

0 comments on commit 3e4c0e9

Please sign in to comment.