Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Delete PIT service layer changes #3949

Merged
merged 22 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a8be024
Create pit service layer changes
bharath-techie Jul 15, 2022
8a8f708
Create pit service layer changes
bharath-techie Jul 15, 2022
f20dbdc
Delete pit service layer changes
bharath-techie Jul 15, 2022
fa9946b
Create pit service layer changes
bharath-techie Jul 15, 2022
8f673c5
Merge branch 'createpitservice' of github.com:bharath-techie/OpenSear…
bharath-techie Jul 15, 2022
97c4f77
changing to consistent action names
bharath-techie Jul 15, 2022
bfcb50a
changing to consistent action names
bharath-techie Jul 15, 2022
f96c9d7
Addressing review comment
bharath-techie Jul 18, 2022
7f9d1b8
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie Jul 19, 2022
5d8ec6b
Addressing review comment
bharath-techie Jul 19, 2022
987cb57
Addressing review comments
bharath-techie Jul 19, 2022
9497923
Merge branch 'createpitservice' of github.com:bharath-techie/OpenSear…
bharath-techie Jul 19, 2022
8fcc25c
Addressing review comments
bharath-techie Jul 19, 2022
ab3f0d4
Merge branch 'createpitservice' of github.com:bharath-techie/OpenSear…
bharath-techie Jul 19, 2022
41d342d
Resolving conflicts
bharath-techie Jul 19, 2022
25bb645
Addressing comments
bharath-techie Jul 20, 2022
717cd66
Addressing comments
bharath-techie Jul 21, 2022
f4db20b
Addressing review comment
bharath-techie Jul 22, 2022
3f08013
Addressing review comment - adding tests to test concurrency
bharath-techie Jul 25, 2022
f76fa24
Addressing review comment - adding tests to test concurrency
bharath-techie Jul 25, 2022
d870dd5
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie Jul 26, 2022
ae618ff
Merge branch 'deletepitservice' of github.com:bharath-techie/OpenSear…
bharath-techie Jul 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -661,6 +663,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
Expand All @@ -51,6 +54,7 @@ public class CreatePitController {
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final PitService pitService;
private static final Logger logger = LogManager.getLogger(CreatePitController.class);
public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting(
"point_in_time.init.keep_alive",
Expand All @@ -63,12 +67,14 @@ public CreatePitController(
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry
NamedWriteableRegistry namedWriteableRegistry,
PitService pitService
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.pitService = pitService;
}

/**
Expand Down Expand Up @@ -248,8 +254,46 @@ public void onResponse(final Collection<UpdatePitContextResponse> responses) {

@Override
public void onFailure(final Exception e) {
cleanupContexts(contexts, createPITResponse.getId());
updatePitIdListener.onFailure(e);
}
}, size);
}

/**
* Cleanup all created PIT contexts in case of failure
*/
private void cleanupContexts(Collection<SearchContextIdForNode> contexts, String pitId) {
ActionListener<DeletePitResponse> deleteListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
// this is invoke and forget call
final StringBuilder failedPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> !r.isSuccessful())
.forEach(r -> failedPitsStringBuilder.append(r.getPitId()).append(","));
logger.warn(() -> new ParameterizedMessage("Failed to delete PIT IDs {}", failedPitsStringBuilder.toString()));
if (!logger.isDebugEnabled()) return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put this inside the if block instead of negating it and returning early

final StringBuilder successfulPitsStringBuilder = new StringBuilder();
response.getDeletePitResults()
.stream()
.filter(r -> r.isSuccessful())
.forEach(r -> successfulPitsStringBuilder.append(r.getPitId()).append(","));
logger.debug(() -> new ParameterizedMessage("Deleted PIT with IDs {}", successfulPitsStringBuilder.toString()));
}

@Override
public void onFailure(Exception e) {
logger.error("Cleaning up PIT contexts failed ", e);
}
};
Map<String, List<PitSearchContextIdForNode>> nodeToContextsMap = new HashMap<>();
for (SearchContextIdForNode context : contexts) {
List<PitSearchContextIdForNode> contextIdsForNode = nodeToContextsMap.getOrDefault(context.getNode(), new ArrayList<>());
contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context));
nodeToContextsMap.put(context.getNode(), contextIdsForNode);
}
pitService.deletePitContexts(nodeToContextsMap, deleteListener);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for deleting point in time searches
*/
public class DeletePitAction extends ActionType<DeletePitResponse> {

public static final DeletePitAction INSTANCE = new DeletePitAction();
public static final String NAME = "indices:data/read/point_in_time/delete";

private DeletePitAction() {
super(NAME, DeletePitResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;

import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg;

/**
* This class captures if deletion of pit is successful along with pit id
*/
public class DeletePitInfo extends TransportResponse implements Writeable, ToXContent {
/**
* This will be true if PIT reader contexts are deleted ond also if contexts are not found.
*/
private final boolean successful;

private final String pitId;

public DeletePitInfo(boolean successful, String pitId) {
this.successful = successful;
this.pitId = pitId;
}

public DeletePitInfo(StreamInput in) throws IOException {
successful = in.readBoolean();
pitId = in.readString();

}

public boolean isSuccessful() {
return successful;
}

public String getPitId() {
return pitId;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(successful);
out.writeString(pitId);
}

static final ConstructingObjectParser<DeletePitInfo, Void> PARSER = new ConstructingObjectParser<>(
"delete_pit_info",
true,
args -> new DeletePitInfo((boolean) args[0], (String) args[1])
);

static {
PARSER.declareBoolean(constructorArg(), new ParseField("successful"));
PARSER.declareString(constructorArg(), new ParseField("pitId"));
}

private static final ParseField SUCCESSFUL = new ParseField("successful");
private static final ParseField PIT_ID = new ParseField("pitId");

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SUCCESSFUL.getPreferredName(), successful);
builder.field(PIT_ID.getPreferredName(), pitId);
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.ToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* Request to delete one or more PIT search contexts based on IDs.
*/
public class DeletePitRequest extends ActionRequest implements ToXContentObject {

/**
* List of PIT IDs to be deleted , and use "_all" to delete all PIT reader contexts
*/
private final List<String> pitIds = new ArrayList<>();

public DeletePitRequest(StreamInput in) throws IOException {
super(in);
pitIds.addAll(Arrays.asList(in.readStringArray()));
}

public DeletePitRequest(String... pitIds) {
this.pitIds.addAll(Arrays.asList(pitIds));
}

public DeletePitRequest(List<String> pitIds) {
this.pitIds.addAll(pitIds);
}

public DeletePitRequest() {}

public List<String> getPitIds() {
return pitIds;
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (pitIds == null || pitIds.isEmpty()) {
validationException = addValidationError("no pit ids specified", validationException);
}
return validationException;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (pitIds == null) {
out.writeVInt(0);
} else {
out.writeStringArray(pitIds.toArray(new String[pitIds.size()]));
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startArray("pit_id");
for (String pitId : pitIds) {
builder.value(pitId);
}
builder.endArray();
builder.endObject();
return builder;
}

public void fromXContent(XContentParser parser) throws IOException {
pitIds.clear();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed content, must start with an object");
} else {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("pit_id".equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id array element should only contain pit_id");
}
pitIds.add(parser.text());
}
} else {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id element should only contain pit_id");
}
pitIds.add(parser.text());
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
}
}

}
Loading