Skip to content

Commit

Permalink
Increase JavaDoc coverage and update PR based comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jan 23, 2024
1 parent 33fbda7 commit a628089
Show file tree
Hide file tree
Showing 17 changed files with 343 additions and 98 deletions.
1 change: 0 additions & 1 deletion gradle/missing-javadoc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ configure([
project(":plugins:mapper-annotated-text"),
project(":plugins:mapper-murmur3"),
project(":plugins:mapper-size"),
project(":plugins:query-insights"),
project(":plugins:repository-azure"),
project(":plugins:repository-gcs"),
project(":plugins:repository-hdfs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
* @opensearch.internal
*/
public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
private QueryInsightsExporterType type;
private String identifier;
private final String identifier;

QueryInsightsExporter(QueryInsightsExporterType type, String identifier) {
this.type = type;
QueryInsightsExporter(String identifier) {
this.identifier = identifier;
}

Expand All @@ -35,18 +33,10 @@ public abstract class QueryInsightsExporter<T extends SearchQueryRecord<?>> {
*/
public abstract void export(List<T> records) throws Exception;

public void setType(QueryInsightsExporterType type) {
this.type = type;
}

public QueryInsightsExporterType getType() {
return type;
}

public void setIdentifier(String identifier) {
this.identifier = identifier;
}

/**
* Get the identifier of this exporter
* @return identifier of this exporter
*/
public String getIdentifier() {
return identifier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@
* @opensearch.internal
*/
public enum QueryInsightsExporterType {
/* local index exporter */
LOCAL_INDEX("local_index");
/** local index exporter */
LOCAL_INDEX;

private final String type;

QueryInsightsExporterType(String type) {
this.type = type;
@Override
public String toString() {
return super.toString().toLowerCase(Locale.ROOT);
}

/**
* Parse QueryInsightsExporterType from String
* @param type the String representation of the QueryInsightsExporterType
* @return QueryInsightsExporterType
*/
public static QueryInsightsExporterType parse(String type) {
return valueOf(type.toUpperCase(Locale.ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,20 @@ public class QueryInsightsLocalIndexExporter<T extends SearchQueryRecord<?>> ext
/** The mapping for the local index that holds the data */
private final InputStream localIndexMapping;

/**
* Create a QueryInsightsLocalIndexExporter Object
* @param clusterService The clusterService of the node
* @param client The OpenSearch Client to support index operations
* @param localIndexName The local index name to export the data to
* @param localIndexMapping The mapping for the local index
*/
public QueryInsightsLocalIndexExporter(
ClusterService clusterService,
Client client,
String localIndexName,
InputStream localIndexMapping
) {
super(QueryInsightsExporterType.LOCAL_INDEX, localIndexName);
super(localIndexName);
this.clusterService = clusterService;
this.client = client;
this.localIndexMapping = localIndexMapping;
Expand All @@ -70,42 +77,41 @@ public QueryInsightsLocalIndexExporter(
* @throws IOException if an error occurs
*/
@Override
public synchronized void export(List<T> records) throws IOException {
public void export(List<T> records) throws IOException {
if (records.size() == 0) {
return;
}
if (checkIfIndexExists()) {
bulkRecord(records);
} else {
// local index not exist
initLocalIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier())
);
try {
bulkRecord(records);
} catch (IOException e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e));
}
} else {
log.error(
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
getIdentifier()
)
);
boolean indexExists = checkAndInitLocalIndex(new ActionListener<>() {
@Override
public void onResponse(CreateIndexResponse response) {
if (response.isAcknowledged()) {
log.debug(
String.format(Locale.ROOT, "successfully initialized local index %s for query insight.", getIdentifier())
);
try {
bulkRecord(records);
} catch (IOException e) {
log.error(String.format(Locale.ROOT, "fail to ingest query insight data to local index, error: %s", e));
}
} else {
log.error(
String.format(
Locale.ROOT,
"request to created local index %s for query insight not acknowledged.",
getIdentifier()
)
);
}
}

@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e));
}
});
@Override
public void onFailure(Exception e) {
log.error(String.format(Locale.ROOT, "error creating local index for query insight: %s", e));
}
});

if (indexExists) {
bulkRecord(records);
}
}

Expand All @@ -120,15 +126,20 @@ private boolean checkIfIndexExists() {
}

/**
* Initialize the local OpenSearch Index for the exporter
* Check and initialize the local OpenSearch Index for the exporter
*
* @param listener the listener to be notified upon completion
* @throws IOException if an error occurs
*/
private synchronized void initLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
private synchronized boolean checkAndInitLocalIndex(ActionListener<CreateIndexResponse> listener) throws IOException {
if (!checkIfIndexExists()) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(this.getIdentifier()).mapping(getIndexMappings())
.settings(Settings.builder().put("index.hidden", false).build());
client.admin().indices().create(createIndexRequest, listener);
return false;
} else {
return true;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.opensearch.action.search.SearchRequestOperationsListener;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.plugin.insights.core.service.TopQueriesByLatencyService;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
Expand All @@ -43,6 +45,12 @@ public final class SearchQueryLatencyListener extends SearchRequestOperationsLis

private final TopQueriesByLatencyService topQueriesByLatencyService;

/**
* Create a new SearchQueryLatencyListener object
*
* @param clusterService The Node's cluster service.
* @param topQueriesByLatencyService The topQueriesByLatencyService associated with this listener
*/
@Inject
public SearchQueryLatencyListener(ClusterService clusterService, TopQueriesByLatencyService topQueriesByLatencyService) {
this.topQueriesByLatencyService = topQueriesByLatencyService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public abstract class QueryInsightsService<R extends SearchQueryRecord<?>, S ext
/** enable insight data export */
private boolean enableExport;

/** The internal store that holds the query insight data */
/** The internal thread-safe store that holds the query insight data */
@Nullable
protected S store;

Expand All @@ -59,8 +59,19 @@ public abstract class QueryInsightsService<R extends SearchQueryRecord<?>, S ext

/** The internal OpenSearch thread pool that execute async processing and exporting tasks*/
protected final ThreadPool threadPool;

/**
* Holds a reference to delayed operation {@link Scheduler.Cancellable} so it can be cancelled when
* the service closed concurrently.
*/
protected volatile Scheduler.Cancellable scheduledFuture;

/**
* Create the Query Insights Service object
* @param threadPool The OpenSearch thread pool to run async tasks
* @param store The in memory store to keep the Query Insights data
* @param exporter The optional {@link QueryInsightsExporter} to export the Query Insights data
*/
@Inject
public QueryInsightsService(ThreadPool threadPool, @Nullable S store, @Nullable E exporter) {
this.threadPool = threadPool;
Expand Down Expand Up @@ -102,7 +113,13 @@ public List<R> getQueryData() throws IllegalArgumentException {
public abstract void clearOutdatedData();

/**
* Restart the exporter with new config
* Reset the exporter with new config
*
* This function can be used to enable/disable an exporter, change the type of the exporter,
* or change the identifier of the exporter.
* @param enabled the enable flag to set on the exporter
* @param type The QueryInsightsExporterType to set on the exporter
* @param identifier the Identifier to set on the exporter
*/
public abstract void resetExporter(boolean enabled, QueryInsightsExporterType type, String identifier);

Expand All @@ -113,18 +130,34 @@ public void clearAllData() {
store.clear();
}

/**
* Set flag to enable or disable Query Insights data collection
* @param enableCollect Flag to enable or disable Query Insights data collection
*/
public void setEnableCollect(boolean enableCollect) {
this.enableCollect = enableCollect;
}

/**
* Get if the Query Insights data collection is enabled
* @return if the Query Insights data collection is enabled
*/
public boolean getEnableCollect() {
return this.enableCollect;
}

/**
* Set flag to enable or disable Query Insights data export
* @param enableExport
*/
public void setEnableExport(boolean enableExport) {
this.enableExport = enableExport;
}

/**
* Get if the Query Insights data export is enabled
* @return if the Query Insights data export is enabled
*/
public boolean getEnableExport() {
return this.enableExport;
}
Expand Down Expand Up @@ -156,7 +189,6 @@ private void doExport() {
List<R> storedData = getQueryData();
try {
exporter.export(storedData);
log.debug(String.format(Locale.ROOT, "finish exporting query insight data to sink %s", storedData));
} catch (Exception e) {
throw new RuntimeException(String.format(Locale.ROOT, "failed to export query insight data to sink, error: %s", e));
}
Expand All @@ -165,6 +197,10 @@ private void doExport() {
@Override
protected void doClose() {}

/**
* Get the export interval set for the {@link QueryInsightsExporter}
* @return export interval
*/
public TimeValue getExportInterval() {
return exportInterval;
}
Expand Down
Loading

0 comments on commit a628089

Please sign in to comment.