Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Data Frame HLRC start & stop APIs #40154

Merged
merged 7 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import org.elasticsearch.client.core.AcknowledgedResponse;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformResponse;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformResponse;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -115,4 +119,85 @@ public void deleteDataFrameTransformAsync(DeleteDataFrameTransformRequest reques
listener,
Collections.emptySet());
}


/**
* Start a data frame transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Start Data Frame transform documentation</a>
*
* @param request The start data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return A response object indicating request success
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public StartDataFrameTransformResponse startDataFrameTransform(StartDataFrameTransformRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::startDataFrameTransform,
options,
StartDataFrameTransformResponse::fromXContent,
Collections.emptySet());
}

/**
* Start a data frame transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Start Data Frame transform documentation</a>
*
* @param request The start data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void startDataFrameTransformAsync(StartDataFrameTransformRequest request, RequestOptions options,
ActionListener<StartDataFrameTransformResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::startDataFrameTransform,
options,
StartDataFrameTransformResponse::fromXContent,
listener,
Collections.emptySet());
}

/**
* Stop a data frame transform
* <p>
* For additional info
* see <a href="https://www.TODO.com">Stop Data Frame transform documentation</a>
*
* @param request The stop data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return A response object indicating request success
* @throws IOException when there is a serialization issue sending the request or receiving the response
*/
public StopDataFrameTransformResponse stopDataFrameTransform(StopDataFrameTransformRequest request, RequestOptions options)
throws IOException {
return restHighLevelClient.performRequestAndParseEntity(request,
DataFrameRequestConverters::stopDataFrameTransform,
options,
StopDataFrameTransformResponse::fromXContent,
Collections.emptySet());
}

/**
* Stop a data frame transform asynchronously and notifies listener on completion
* <p>
* For additional info
* see <a href="https://www.TODO.com">Stop Data Frame transform documentation</a>
*
* @param request The stop data frame transform request
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener Listener to be notified upon request completion
*/
public void stopDataFrameTransformAsync(StopDataFrameTransformRequest request, RequestOptions options,
ActionListener<StopDataFrameTransformResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(request,
DataFrameRequestConverters::stopDataFrameTransform,
options,
StopDataFrameTransformResponse::fromXContent,
listener,
Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
package org.elasticsearch.client;

import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.PutDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StartDataFrameTransformRequest;
import org.elasticsearch.client.dataframe.StopDataFrameTransformRequest;

import java.io.IOException;

Expand Down Expand Up @@ -50,4 +53,35 @@ static Request deleteDataFrameTransform(DeleteDataFrameTransformRequest request)
.build();
return new Request(HttpDelete.METHOD_NAME, endpoint);
}

static Request startDataFrameTransform(StartDataFrameTransformRequest startRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(startRequest.getId())
.addPathPartAsIs("_start")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
if (startRequest.getTimeout() != null) {
params.withTimeout(startRequest.getTimeout());
}
return request;
}

static Request stopDataFrameTransform(StopDataFrameTransformRequest stopRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPartAsIs("_data_frame", "transforms")
.addPathPart(stopRequest.getId())
.addPathPartAsIs("_stop")
.build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
RequestConverters.Params params = new RequestConverters.Params(request);
if (stopRequest.getWaitForCompletion() != null) {
params.withWaitForCompletion(stopRequest.getWaitForCompletion());
}
if (stopRequest.getTimeout() != null) {
params.withTimeout(stopRequest.getTimeout());
}
return request;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.core;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

public class AcknowledgedTasksResponse {

protected static final ParseField TASK_FAILURES = new ParseField("task_failures");
protected static final ParseField NODE_FAILURES = new ParseField("node_failures");

@SuppressWarnings("unchecked")
protected static <T extends AcknowledgedTasksResponse> ConstructingObjectParser<T, Void> generateParser(
String name,
TriFunction<Boolean, List<TaskOperationFailure>, List<? extends ElasticsearchException>, T> ctor,
String ackFieldName) {

ConstructingObjectParser<T, Void> parser = new ConstructingObjectParser<>(name, true,
args -> ctor.apply((boolean) args[0], (List<TaskOperationFailure>) args[1], (List<ElasticsearchException>) args[2]));
parser.declareBoolean(constructorArg(), new ParseField(ackFieldName));
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> TaskOperationFailure.fromXContent(p), TASK_FAILURES);
parser.declareObjectArray(optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p), NODE_FAILURES);
return parser;
}

private boolean acknowledged;
private List<TaskOperationFailure> taskFailures;
private List<ElasticsearchException> nodeFailures;

public AcknowledgedTasksResponse(boolean acknowledged, @Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
this.acknowledged = acknowledged;
this.taskFailures = taskFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(taskFailures));
this.nodeFailures = nodeFailures == null ? Collections.emptyList() : Collections.unmodifiableList(new ArrayList<>(nodeFailures));
}

public boolean isAcknowledged() {
return acknowledged;
}

public List<TaskOperationFailure> getTaskFailures() {
return taskFailures;
}

public List<ElasticsearchException> getNodeFailures() {
return nodeFailures;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}

AcknowledgedTasksResponse other = (AcknowledgedTasksResponse) obj;
return acknowledged == other.acknowledged
&& taskFailures.equals(other.taskFailures)
&& nodeFailures.equals(other.nodeFailures);
}

@Override
public int hashCode() {
return Objects.hash(acknowledged, taskFailures, nodeFailures);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.dataframe;

import org.elasticsearch.client.Validatable;
import org.elasticsearch.client.ValidationException;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Objects;
import java.util.Optional;

public class StartDataFrameTransformRequest implements Validatable {

private final String id;
private TimeValue timeout;

public StartDataFrameTransformRequest(String id) {
this.id = id;
}

public StartDataFrameTransformRequest(String id, TimeValue timeout) {
this.id = id;
this.timeout = timeout;
}

public String getId() {
return id;
}

public TimeValue getTimeout() {
return timeout;
}

public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

@Override
public Optional<ValidationException> validate() {
if (id == null) {
ValidationException validationException = new ValidationException();
validationException.addValidationError("data frame transform id must not be null");
return Optional.of(validationException);
} else {
return Optional.empty();
}
}

@Override
public int hashCode() {
return Objects.hash(id, timeout);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null || getClass() != obj.getClass()) {
return false;
}
StartDataFrameTransformRequest other = (StartDataFrameTransformRequest) obj;
return Objects.equals(this.id, other.id)
&& Objects.equals(this.timeout, other.timeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.dataframe;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.client.core.AcknowledgedTasksResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.List;

public class StartDataFrameTransformResponse extends AcknowledgedTasksResponse {

private static final String STARTED = "started";

private static final ConstructingObjectParser<StartDataFrameTransformResponse, Void> PARSER =
AcknowledgedTasksResponse.generateParser("start_data_frame_transform_response", StartDataFrameTransformResponse::new, STARTED);

public static StartDataFrameTransformResponse fromXContent(final XContentParser parser) throws IOException {
return PARSER.parse(parser, null);
}

public StartDataFrameTransformResponse(boolean started, @Nullable List<TaskOperationFailure> taskFailures,
@Nullable List<? extends ElasticsearchException> nodeFailures) {
super(started, taskFailures, nodeFailures);
}

public boolean isStarted() {
return isAcknowledged();
}
}
Loading