Skip to content

Commit

Permalink
Handle batch item responses for event posting.
Browse files Browse the repository at this point in the history
This supports the case when the server returns 207 or 422 for an
event post. It also provides a convenience method on EventResource
that returns a BatchItemResponseCollection instead of having to
marshal the Response content from the other send methods.

For #116.
  • Loading branch information
dehora committed Feb 13, 2017
1 parent 7715ad4 commit 935a042
Show file tree
Hide file tree
Showing 10 changed files with 240 additions and 15 deletions.
12 changes: 6 additions & 6 deletions nakadi-java-client/src/main/java/nakadi/BatchItemResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
public class BatchItemResponse {

private String eid;
private PublishingStatus status;
private PublishingStatus publishing_status;
private Step step;
private String detail;

public String eid() {
return eid;
}

public PublishingStatus status() {
return status;
public PublishingStatus publishingStatus() {
return publishing_status;
}

public Step step() {
Expand All @@ -26,22 +26,22 @@ public String detail() {
}

@Override public int hashCode() {
return Objects.hash(eid, status, step, detail);
return Objects.hash(eid, publishing_status, step, detail);
}

@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BatchItemResponse that = (BatchItemResponse) o;
return Objects.equals(eid, that.eid) &&
status == that.status &&
publishing_status == that.publishing_status &&
step == that.step &&
Objects.equals(detail, that.detail);
}

@Override public String toString() {
return "BatchItemResponse{" + "eid='" + eid + '\'' +
", status=" + status +
", publishing_status=" + publishing_status +
", step=" + step +
", detail='" + detail + '\'' +
'}';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package nakadi;

import java.util.List;

/**
* The result of a partial failure to post events.
*/
public class BatchItemResponseCollection extends ResourceCollection<BatchItemResponse> {

BatchItemResponseCollection(List<BatchItemResponse> items, List<ResourceLink> links) {
super(items, links);
}

public ResourceCollection<BatchItemResponse> fetchPage(String url) {
throw new UnsupportedOperationException("Paging batch item responses is not supported");
}
}
22 changes: 22 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/EventResource.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nakadi;

import java.util.Collection;
import java.util.List;

public interface EventResource {

Expand Down Expand Up @@ -72,4 +73,25 @@ public interface EventResource {
*/
<T> Response send(String eventTypeName, T event);


/**
* Send a batch of events to the server.
*
* <p>
* If the response is 422 or 207 the BatchItemResponseCollection will contain items.
* </p>
* <p>
* <b>Warning: </b> the ordering and general delivery behaviour for event delivery is
* undefined under retries. That is, a delivery retry may result in out or order batches being
* sent to the server. Also retrying a partially delivered (207) batch may result in one
* or more events being delivered multiple times.
* </p>
*
* @param eventTypeName the event type name
* @param events the events
* @return a BatchItemResponseCollection which will be empty if successful or have items
* if the post was partially successful (via a 422 or 207 response)
*/
<T> BatchItemResponseCollection sendBatch(String eventTypeName, List<T> events);

}
35 changes: 31 additions & 4 deletions nakadi-java-client/src/main/java/nakadi/EventResourceReal.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
package nakadi;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventResourceReal implements EventResource {

private static final Logger logger = LoggerFactory.getLogger(NakadiClient.class.getSimpleName());

private static final String PATH_EVENT_TYPES = "event-types";
private static final String PATH_COLLECTION = "events";
private static final String APPLICATION_JSON = "application/json";

private static final List<ResourceLink> LINKS_SENTINEL = Lists.newArrayList();
private static Type TYPE_BIR = new TypeToken<List<BatchItemResponse>>() {
}.getType();

private final NakadiClient client;
private String scope;
private volatile RetryPolicy retryPolicy;
Expand Down Expand Up @@ -92,15 +104,30 @@ public final <T> Response send(String eventTypeName, Collection<T> events) {
}
}

@Override public <T> BatchItemResponseCollection sendBatch(String eventTypeName, List<T> events) {
Response send = send(eventTypeName, events);
List<BatchItemResponse> items = Lists.newArrayList();

if (send.statusCode() == 207 || send.statusCode() == 422) {
try(ResponseBody responseBody = send.responseBody()) {
items.addAll(client.jsonSupport().fromJson(responseBody.asReader(), TYPE_BIR));
} catch(IOException e) {
logger.error("Error handling BatchItemResponse " + e.getMessage(), e);
}
}

return new BatchItemResponseCollection(items, LINKS_SENTINEL);
}

private Response sendUsingSupplier(String eventTypeName, EventContentSupplier supplier) {
return timed(() -> {
ResourceOptions options =
options().scope(applyScope(TokenProvider.NAKADI_EVENT_STREAM_WRITE));
return client.resourceProvider()
.newResource()
.retryPolicy(retryPolicy)
.requestThrowing(
Resource.POST, collectionUri(eventTypeName).buildString(), options, supplier);
.postEventsThrowing(
collectionUri(eventTypeName).buildString(), options, supplier);
},
client,
1);
Expand All @@ -119,8 +146,8 @@ private <T> Response send(List<EventRecord<T>> events) {
return client.resourceProvider()
.newResource()
.retryPolicy(retryPolicy)
.requestThrowing(
Resource.POST, collectionUri(topic).buildString(), options, eventList);
.postEventsThrowing(
collectionUri(topic).buildString(), options, eventList);
},
client,
eventList.size());
Expand Down
4 changes: 4 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/GsonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public <T> T fromJson(String raw, Type tType) {
return gson.fromJson(r, c);
}

@Override public <T> T fromJson(Reader r, Type tType) {
return gson.fromJson(r, tType);
}

/**
* Punch a hole in the abstraction to let us deal with business and undefined event types that
* can't be marshalled sanely otherwise.
Expand Down
10 changes: 10 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/JsonSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,14 @@ public interface JsonSupport {
* @return an instance of T
*/
<T> T fromJson(Reader r, Class<T> c);

/**
* Marshal the JSON data to an instance of T.
*
* @param r JSON as a Reader
* @param tType the type of the target
* @param <T> the parameterized target type
* @return an instance of T
*/
<T> T fromJson(Reader r, Type tType);
}
19 changes: 19 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/OkHttpResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,17 @@ public <Req> Response requestThrowing(String method, String url, ResourceOptions
return maybeComposeRetryPolicy(observable).toBlocking().first();
}

@Override
public <Req> Response postEventsThrowing(String url, ResourceOptions options, Req body)
throws AuthorizationException, ClientException, ServerException,
RateLimitException, NakadiException {
Observable<Response> observable =
Observable.defer(() -> Observable.just(
throwPostEventsIfError(requestInner(POST, url, options, body))));

return maybeComposeRetryPolicy(observable).toBlocking().first();
}

@Override
public <Res> Res requestThrowing(String method, String url, ResourceOptions options,
Class<Res> res) throws NakadiException {
Expand Down Expand Up @@ -300,6 +311,14 @@ with a retry fails (we have to wrap non-throwing requests as throwing to
}
}

private Response throwPostEventsIfError(Response response) {
int code = response.statusCode();
if (code == 207 || code == 422) {
return response;
}
return throwIfError(response);
}

private <T> T handleError(Response response) {
String raw = response.responseBody().asString();
Problem problem = Optional.ofNullable(jsonSupport.fromJson(raw, Problem.class))
Expand Down
24 changes: 24 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/Resource.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,30 @@ <Req> Response requestThrowing(String method, String url, ResourceOptions option
throws AuthorizationException, ClientException, ServerException, InvalidException,
RateLimitException, NakadiException;

/**
* Make a request against the server with a request entity. This method is tailored for
* posting events.
*
* <p>
* If the server returns 422 or 207 indicating partial failures, errors are not thrown.
* Instead callers can inspect the response body for an array of batch item responses.
* </p>
*
* @param url the resource url
* @param options requestThrowing options such as headers, and tokens.
* @param body the object to create a JSON requestThrowing body from
* @param <Req> the body type
* @return a http response
* @throws AuthorizationException for 401 and 403
* @throws ClientException for general 4xx errors (422 does not cause an exception)
* @throws ServerException for general 5xx errors
* @throws RateLimitException for 429
* @throws NakadiException a non HTTP based exception
*/
<Req> Response postEventsThrowing(String url, ResourceOptions options, Req body)
throws AuthorizationException, ClientException, ServerException, RateLimitException,
NakadiException;

/**
* Make a request against the server with an expected response entity. Useful for get
* requests. Exceptions are thrown for HTTP level errors (4xx and 5xx).
Expand Down
98 changes: 93 additions & 5 deletions nakadi-java-client/src/test/java/nakadi/EventResourceRealTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -72,6 +73,95 @@ public void after() {
}
}

@Test
public void returnedWithABatchItemResponseFor422And207() throws Exception {
NakadiClient client = spy(NakadiClient.newBuilder()
.baseURI("http://localhost:8312")
.build());

String errJson = TestSupport.load("err_batch_item_response_array.json");

try {
before();

List<UndefinedEventMapped<UndefinedPayload>> list = ImmutableList.of(
new UndefinedEventMapped<UndefinedPayload>().data(
new UndefinedPayload("01", "A", "B")),
new UndefinedEventMapped<UndefinedPayload>().data(
new UndefinedPayload("02", "C", "D"))
);

EventResource resource = client.resources().events();

// 422 and 207 batch requests shouldn't throw exceptions

server.enqueue(new MockResponse().setResponseCode(422).setBody(errJson));
Response r1 = resource.send("ue-1-1479125860", list);
assertTrue(r1.statusCode() == 422);

server.enqueue(new MockResponse().setResponseCode(207).setBody(errJson));
Response r2 = resource.send("ue-1-1479125860", list);
assertTrue(r2.statusCode() == 207);

// 422 and 207 discrete requests shouldn't throw exceptions

server.enqueue(new MockResponse().setResponseCode(422).setBody(errJson));
Response r3 = resource.send("ue-1-1479125860",
new UndefinedEventMapped<UndefinedPayload>()
.data(new UndefinedPayload("01", "A", "B")));
assertTrue(r3.statusCode() == 422);

server.enqueue(new MockResponse().setResponseCode(207).setBody(errJson));
Response r4 = resource.send("ue-1-1479125860",
new UndefinedEventMapped<UndefinedPayload>()
.data(new UndefinedPayload("01", "A", "B")));
assertTrue(r4.statusCode() == 207);

// 422 and 207 raw requests shouldn't throw exceptions

String raw = TestSupport.load("event-type-1.json");

server.enqueue(new MockResponse().setResponseCode(422).setBody(errJson));
Response r5 = resource.send("ue-1-1479125860", raw);
assertTrue(r5.statusCode() == 422);

server.enqueue(new MockResponse().setResponseCode(207).setBody(errJson));
Response r6 = resource.send("ue-1-1479125860", raw);
assertTrue(r6.statusCode() == 207);

// check we can marshal the content

String s = r6.responseBody().asString();

Type TYPE_P = new TypeToken<List<BatchItemResponse>>() {
}.getType();

List<BatchItemResponse> collection = json.fromJson(s, TYPE_P);

BatchItemResponseCollection bir = new BatchItemResponseCollection(collection, Lists.newArrayList());

assertTrue(bir.items().size() == 2);
BatchItemResponse batchItemResponse = bir.items().get(0);
assertEquals ("7d7574c3-42ac-4e23-8c92-cd854ab1845a", batchItemResponse.eid());
assertEquals ("failed", batchItemResponse.publishingStatus().name());
assertEquals ("enriching", batchItemResponse.step().name());
assertEquals ("no good", batchItemResponse.detail());

// check a collection response
server.enqueue(new MockResponse().setResponseCode(207).setBody(errJson));
BatchItemResponseCollection batch =
resource.sendBatch("ue-1-1479125860", Lists.newArrayList(raw));
assertTrue(batch.items().size() == 2);

HashSet<String> eids =
Sets.newHashSet(batch.items().get(0).eid(), batch.items().get(1).eid());
assertTrue(eids.contains("7d7574c3-42ac-4e23-8c92-cd854ab1845a"));
assertTrue(eids.contains("980c8aa9-7921-4675-a0c0-0b33b1459944"));
} finally {
after();
}
}

@Test
public void undefinedIsSentToServerMapped() throws Exception {

Expand Down Expand Up @@ -238,8 +328,7 @@ public Happened data() {

ArgumentCaptor<ResourceOptions> options = ArgumentCaptor.forClass(ResourceOptions.class);

verify(r, times(1)).requestThrowing(
Matchers.eq(Resource.POST),
verify(r, times(1)).postEventsThrowing(
Matchers.eq("http://localhost:9080/event-types/foo/events"),
options.capture(),
Matchers.anyList());
Expand Down Expand Up @@ -275,8 +364,7 @@ public Happened data() {

options = ArgumentCaptor.forClass(ResourceOptions.class);

verify(r, times(1)).requestThrowing(
Matchers.eq(Resource.POST),
verify(r, times(1)).postEventsThrowing(
Matchers.eq("http://localhost:9080/event-types/foo/events"),
options.capture(),
Matchers.anyList());
Expand Down
Loading

0 comments on commit 935a042

Please sign in to comment.