Skip to content

Commit

Permalink
Implementing Delete PIT service layer changes (#3949)
Browse files Browse the repository at this point in the history
* Delete pit service layer changes

Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie authored Jul 26, 2022
1 parent a34a4c0 commit f4b0eef
Show file tree
Hide file tree
Showing 19 changed files with 2,033 additions and 8 deletions.
3 changes: 3 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -661,6 +663,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
Expand All @@ -51,6 +54,7 @@ public class CreatePitController {
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final PitService pitService;
private static final Logger logger = LogManager.getLogger(CreatePitController.class);
public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting(
"point_in_time.init.keep_alive",
Expand All @@ -63,12 +67,14 @@ public CreatePitController(
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry
NamedWriteableRegistry namedWriteableRegistry,
PitService pitService
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.pitService = pitService;
}

/**
Expand Down Expand Up @@ -248,8 +254,47 @@ public void onResponse(final Collection<UpdatePitContextResponse> responses) {

@Override
public void onFailure(final Exception e) {
cleanupContexts(contexts, createPITResponse.getId());
updatePitIdListener.onFailure(e);
}
}, size);
}

/**
* Cleanup all created PIT contexts in case of failure
*/
private void cleanupContexts(Collection<SearchContextIdForNode> contexts, String pitId) {
ActionListener<DeletePitResponse> deleteListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
// this is invoke and forget call
final StringBuilder failedPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> !r.isSuccessful())
.forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(","));
logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString()));
if (logger.isDebugEnabled()) {
final StringBuilder successfulPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> r.isSuccessful())
.forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(","));
logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString()));
}
}

@Override
public void onFailure(Exception e) {
logger.error("Cleaning up PIT contexts failed ", e);
}
};
Map<String, List<PitSearchContextIdForNode>> nodeToContextsMap = new HashMap<>();
for (SearchContextIdForNode context : contexts) {
List<PitSearchContextIdForNode> contextIdsForNode = nodeToContextsMap.getOrDefault(context.getNode(), new ArrayList<>());
contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context));
nodeToContextsMap.put(context.getNode(), contextIdsForNode);
}
pitService.deletePitContexts(nodeToContextsMap, deleteListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for deleting point in time searches
*/
public class DeletePitAction extends ActionType<DeletePitResponse> {

public static final DeletePitAction INSTANCE = new DeletePitAction();
public static final String NAME = "indices:data/read/point_in_time/delete";

private DeletePitAction() {
super(NAME, DeletePitResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;

import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg;

/**
* This class captures if deletion of pit is successful along with pit id
*/
public class DeletePitInfo extends TransportResponse implements Writeable, ToXContent {
/**
* This will be true if PIT reader contexts are deleted ond also if contexts are not found.
*/
private final boolean successful;

private final String pitId;

public DeletePitInfo(boolean successful, String pitId) {
this.successful = successful;
this.pitId = pitId;
}

public DeletePitInfo(StreamInput in) throws IOException {
successful = in.readBoolean();
pitId = in.readString();

}

public boolean isSuccessful() {
return successful;
}

public String getPitId() {
return pitId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(successful);
out.writeString(pitId);
}

static final ConstructingObjectParser<DeletePitInfo, Void> PARSER = new ConstructingObjectParser<>(
"delete_pit_info",
true,
args -> new DeletePitInfo((boolean) args[0], (String) args[1])
);

static {
PARSER.declareBoolean(constructorArg(), new ParseField("successful"));
PARSER.declareString(constructorArg(), new ParseField("pitId"));
}

private static final ParseField SUCCESSFUL = new ParseField("successful");
private static final ParseField PIT_ID = new ParseField("pitId");

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SUCCESSFUL.getPreferredName(), successful);
builder.field(PIT_ID.getPreferredName(), pitId);
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Request to delete one or more PIT search contexts based on IDs.
*/
public class DeletePitRequest extends ActionRequest implements ToXContentObject {

/**
* List of PIT IDs to be deleted , and use "_all" to delete all PIT reader contexts
*/
private final List<String> pitIds = new ArrayList<>();

public DeletePitRequest(StreamInput in) throws IOException {
super(in);
pitIds.addAll(Arrays.asList(in.readStringArray()));
}

public DeletePitRequest(String... pitIds) {
this.pitIds.addAll(Arrays.asList(pitIds));
}

public DeletePitRequest(List<String> pitIds) {
this.pitIds.addAll(pitIds);
}

public DeletePitRequest() {}

public List<String> getPitIds() {
return pitIds;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (pitIds == null || pitIds.isEmpty()) {
validationException = addValidationError("no pit ids specified", validationException);
}
return validationException;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (pitIds == null) {
out.writeVInt(0);
} else {
out.writeStringArray(pitIds.toArray(new String[pitIds.size()]));
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startArray("pit_id");
for (String pitId : pitIds) {
builder.value(pitId);
}
builder.endArray();
builder.endObject();
return builder;
}

public void fromXContent(XContentParser parser) throws IOException {
pitIds.clear();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed content, must start with an object");
} else {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("pit_id".equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
}
pitIds.add(parser.text());
}
} else {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id element should only contain pit_id");
}
pitIds.add(parser.text());
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
}
}

}
Loading

0 comments on commit f4b0eef

Please sign in to comment.