Skip to content

Commit

Permalink
Block datasource async queries
Browse files Browse the repository at this point in the history
Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
  • Loading branch information
vmmusings committed Feb 26, 2024
1 parent fcc4be3 commit 76883f7
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class GlueDataSourceFactory implements DataSourceFactory {
"glue.indexstore.opensearch.auth.password";
public static final String GLUE_INDEX_STORE_OPENSEARCH_REGION =
"glue.indexstore.opensearch.region";
public static final String GLUE_ASYNC_QUERY_ENABLED =
"glue.async_query.enabled";

@Override
public DataSourceType getDataSourceType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.function.Predicate;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.datasources.glue.GlueDataSourceFactory;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
Expand All @@ -32,14 +35,16 @@ public class DefaultLeaseManager implements LeaseManager {
private final List<Rule<LeaseRequest>> concurrentLimitRules;
private final Settings settings;
private final StateStore stateStore;
private final DataSourceService dataSourceService;

public DefaultLeaseManager(Settings settings, StateStore stateStore) {
public DefaultLeaseManager(Settings settings, StateStore stateStore, DataSourceService dataSourceService) {
this.settings = settings;
this.stateStore = stateStore;
this.concurrentLimitRules =
Arrays.asList(
new ConcurrentSessionRule(settings, stateStore),
new ConcurrentSessionRule(settings, stateStore, dataSourceService),
new ConcurrentRefreshJobRule(settings, stateStore));
this.dataSourceService = dataSourceService;
}

@Override
Expand All @@ -59,6 +64,7 @@ interface Rule<T> extends Predicate<T> {
public static class ConcurrentSessionRule implements Rule<LeaseRequest> {
private final Settings settings;
private final StateStore stateStore;
private final DataSourceService dataSourceService;

@Override
public String description() {
Expand All @@ -68,12 +74,24 @@ public String description() {

@Override
public boolean test(LeaseRequest leaseRequest) {

if (checkDataSourceCircuitBreakerIsEnabled(leaseRequest.getDatasourceName())) {
return false;
}

if (leaseRequest.getJobType() != JobType.INTERACTIVE) {
return true;
}
return activeSessionsCount(stateStore, ALL_DATASOURCE).get() < sessionMaxLimit();
}

private boolean checkDataSourceCircuitBreakerIsEnabled(String datasourceName) {
DataSourceMetadata dataSourceMetadata
= this.dataSourceService.getDataSourceMetadata(datasourceName);
return Boolean.parseBoolean(dataSourceMetadata.getProperties().getOrDefault(
GlueDataSourceFactory.GLUE_ASYNC_QUERY_ENABLED, "false"));
}

public int sessionMaxLimit() {
return settings.getSettingValue(SPARK_EXECUTION_SESSION_LIMIT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.datasources.auth.DataSourceUserAuthorizationHelperImpl;
import org.opensearch.sql.datasources.service.DataSourceServiceImpl;
import org.opensearch.sql.legacy.metrics.GaugeMetric;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
Expand Down Expand Up @@ -96,8 +97,10 @@ public SessionManager sessionManager(
}

@Provides
public DefaultLeaseManager defaultLeaseManager(Settings settings, StateStore stateStore) {
return new DefaultLeaseManager(settings, stateStore);
public DefaultLeaseManager defaultLeaseManager(Settings settings,
StateStore stateStore,
DataSourceServiceImpl dataSourceService) {
return new DefaultLeaseManager(settings, stateStore, dataSourceService);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected AsyncQueryExecutorService createAsyncQueryExecutorService(
new FlintIndexMetadataReaderImpl(client),
client,
new SessionManager(stateStore, emrServerlessClientFactory, pluginSettings),
new DefaultLeaseManager(pluginSettings, stateStore),
new DefaultLeaseManager(pluginSettings, stateStore, this.dataSourceService),
stateStore);
return new AsyncQueryExecutorServiceImpl(
asyncQueryJobMetadataStorageService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.spark.dispatcher.model.JobType;
import org.opensearch.sql.spark.execution.statestore.StateStore;
import org.opensearch.sql.spark.leasemanager.model.LeaseRequest;
Expand All @@ -22,13 +23,15 @@ class DefaultLeaseManagerTest {

@Mock private StateStore stateStore;

@Mock private DataSourceService dataSourceService;

@Test
public void concurrentSessionRuleOnlyApplyToInteractiveQuery() {
assertTrue(
new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore)
new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore, dataSourceService)
.test(new LeaseRequest(JobType.BATCH, "mys3")));
assertTrue(
new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore)
new DefaultLeaseManager.ConcurrentSessionRule(settings, stateStore, dataSourceService)
.test(new LeaseRequest(JobType.STREAMING, "mys3")));
}

Expand Down

0 comments on commit 76883f7

Please sign in to comment.