Skip to content
This repository has been archived by the owner on Apr 22, 2022. It is now read-only.

Commit

Permalink
Merge pull request #233 from divolte/pubsub-attributes
Browse files Browse the repository at this point in the history
Additional Pub/Sub event attributes
  • Loading branch information
asnare authored Sep 21, 2018
2 parents a97536c + bea95f7 commit dd93c6e
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 13 deletions.
10 changes: 9 additions & 1 deletion src/main/java/io/divolte/server/AvroRecordBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ public final class AvroRecordBuffer {

private final DivolteIdentifier partyId;
private final DivolteIdentifier sessionId;
private final String eventId;
private final Instant timestamp;
private final ByteBuffer byteBuffer;

private AvroRecordBuffer(final DivolteIdentifier partyId,
final DivolteIdentifier sessionId,
final String eventId,
final Instant timestamp,
final GenericRecord record) throws IOException {
this.partyId = Objects.requireNonNull(partyId);
this.sessionId = Objects.requireNonNull(sessionId);
this.eventId = Objects.requireNonNull(eventId);
this.timestamp = Objects.requireNonNull(timestamp);

/*
Expand Down Expand Up @@ -79,17 +82,22 @@ public DivolteIdentifier getSessionId() {
return sessionId;
}

public String getEventId() {
return eventId;
}

public Instant getTimestamp() {
return timestamp;
}

public static AvroRecordBuffer fromRecord(final DivolteIdentifier partyId,
final DivolteIdentifier sessionId,
final String eventId,
final Instant timestamp,
final GenericRecord record) {
for (;;) {
try {
return new AvroRecordBuffer(partyId, sessionId, timestamp, record);
return new AvroRecordBuffer(partyId, sessionId, eventId, timestamp, record);
} catch (final BufferOverflowException boe) {
// Increase the buffer size by about 10%
// Because we only ever increase the buffer size, we discard the
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/divolte/server/Mapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public Optional<Item<AvroRecordBuffer>> map(final Item<UndertowEvent> originalIe
final GenericRecord avroRecord = mapper.newRecordFromExchange(parsedEvent);
final AvroRecordBuffer avroBuffer = AvroRecordBuffer.fromRecord(parsedEvent.partyId,
parsedEvent.sessionId,
parsedEvent.eventId,
parsedEvent.requestStartTime,
avroRecord);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.concurrent.NotThreadSafe;
import java.security.NoSuchAlgorithmException;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
Expand All @@ -42,6 +43,8 @@
public final class GoogleCloudPubSubFlusher extends TopicFlusher<PubsubMessage> {
private final static Logger logger = LoggerFactory.getLogger(GoogleCloudPubSubFlusher.class);
private final static String MESSAGE_ATTRIBUTE_PARTYID = "partyIdentifier";
private final static String MESSAGE_ATTRIBUTE_EVENTID = "eventIdentifier";
private final static String MESSAGE_ATTRIBUTE_TIMESTAMP = "timestamp";
private final static String MESSAGE_ATTRIBUTE_SCHEMA_CONFLUENT_ID = "schemaConfluentId";
private final static String MESSAGE_ATTRIBUTE_SCHEMA_FINGERPRINT = "schemaFingerprint";

Expand Down Expand Up @@ -76,6 +79,8 @@ protected PubsubMessage buildRecord(final AvroRecordBuffer record) {
final PubsubMessage.Builder builder = PubsubMessage.newBuilder()
.putAttributes(MESSAGE_ATTRIBUTE_SCHEMA_FINGERPRINT, schemaFingerprint)
.putAttributes(MESSAGE_ATTRIBUTE_PARTYID, record.getPartyId().toString())
.putAttributes(MESSAGE_ATTRIBUTE_EVENTID, record.getEventId())
.putAttributes(MESSAGE_ATTRIBUTE_TIMESTAMP, DateTimeFormatter.ISO_INSTANT.format(record.getTimestamp()))
.setData(ByteString.copyFrom(record.getByteBuffer()));
return schemaConfluentId
.map(id -> builder.putAttributes(MESSAGE_ATTRIBUTE_SCHEMA_CONFLUENT_ID, id))
Expand All @@ -93,9 +98,9 @@ protected ImmutableList<PubsubMessage> sendBatch(final List<PubsubMessage> batch
// (This will serialize them, determine the partition and then assign them to a per-partition buffer.)
final int batchSize = batch.size();
final List<ApiFuture<String>> sendResults =
batch.stream()
.map(publisher::publish)
.collect(Collectors.toCollection(() -> new ArrayList<>(batchSize)));
batch.stream()
.map(publisher::publish)
.collect(Collectors.toCollection(() -> new ArrayList<>(batchSize)));

// At this point the messages are in flight, and we assume being flushed.
// When they eventually complete, each message can be in one of several states:
Expand All @@ -109,26 +114,35 @@ protected ImmutableList<PubsubMessage> sendBatch(final List<PubsubMessage> batch
final String messageId = pendingResult.get();
if (logger.isDebugEnabled()) {
final PubsubMessage message = batch.get(i);
logger.debug("Finished sending event (partyId={}) to Pub/Sub: messageId = {}",
message.getAttributesOrThrow(MESSAGE_ATTRIBUTE_PARTYID), messageId);
logger.debug("Finished sending event (partyId={}, eventId={}) to Pub/Sub: messageId = {}",
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_PARTYID, "N/A"),
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_EVENTID, "N/A"),
messageId);
}
} catch (final ExecutionException e) {
final PubsubMessage message = batch.get(i);
// The Pub/Sub publisher internally has a retry policy, but outside that we also
// retry indefinitely unless it's a cause that we don't understand.
final Throwable cause = e.getCause();
if (cause instanceof ApiException) {
final ApiException apiException = (ApiException)cause;
final ApiException apiException = (ApiException) cause;
if (apiException.isRetryable()) {
if (logger.isDebugEnabled()) {
logger.debug("Transient error sending event (partyId=" + message.getAttributesOrThrow(MESSAGE_ATTRIBUTE_PARTYID) + ") to Pub/Sub; retrying.", cause);
}
logger.debug("Transient error sending event (partyId={}, eventId={}) to Pub/Sub; retrying.",
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_PARTYID, "N/A"),
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_EVENTID, "N/A"),
cause);
remaining.add(message);
} else {
logger.warn("Permanent error sending event (partyId=" + message.getAttributesOrThrow(MESSAGE_ATTRIBUTE_PARTYID) + ") to Pub/Sub; abandoning.", cause);
logger.warn("Permanent error sending event (partyId={}, eventId={}) to Pub/Sub; abandoning.",
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_PARTYID, "N/A"),
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_EVENTID, "N/A"),
cause);
}
} else {
logger.error("Unknown error sending event (partyId=" + message.getAttributesOrThrow(MESSAGE_ATTRIBUTE_PARTYID) + ") to Pub/Sub; abandoning.", cause);
logger.error("Unknown error sending event (partyId={}, eventId={}) to Pub/Sub; abandoning.",
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_PARTYID, "N/A"),
message.getAttributesOrDefault(MESSAGE_ATTRIBUTE_EVENTID, "N/A"),
cause);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ private AvroRecordBuffer newAvroRecordBuffer() {

return AvroRecordBuffer.fromRecord(DivolteIdentifier.generate(),
DivolteIdentifier.generate(),
"anEventId",
now,
record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ private void processRecords() {
records.stream().map(
(record) -> AvroRecordBuffer.fromRecord(DivolteIdentifier.generate(),
DivolteIdentifier.generate(),
"anEventId",
Instant.ofEpochMilli((long)record.get("ts")),
record))
.forEach((arb) -> flusher.process(Item.of(0, arb.getPartyId().value, arb)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private static AvroRecordBuffer generateAvroRecord() {
.build();
return AvroRecordBuffer.fromRecord(DivolteIdentifier.generate(0L),
DivolteIdentifier.generate(1L),
"-",
Instant.EPOCH,
record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -68,6 +70,8 @@ public class GoogleCloudPubSubFlusherTest {
.requiredLong("counter")
.endRecord();

private static final Instant EVENT_TIMESTAMP = ZonedDateTime.of(2018, 9, 14, 13, 32, 10, 34261025, ZoneOffset.UTC).toInstant();

private Optional<DivolteIdentifier> partyId = Optional.empty();
private Optional<DivolteIdentifier> sessionId = Optional.empty();
private long generatedEventCounter;
Expand Down Expand Up @@ -101,12 +105,13 @@ public void clearIdentifiers() {
private AvroRecordBuffer generateMessage() {
final DivolteIdentifier partyId = this.partyId.orElseThrow(IllegalStateException::new);
final DivolteIdentifier sessionId = this.sessionId.orElseThrow(IllegalStateException::new);
final String eventId = sessionId.toString() + '-' + Long.toHexString(generatedEventCounter);
final GenericRecord record = new GenericRecordBuilder(MINIMAL_SCHEMA)
.set("partyId", partyId.toString())
.set("sessionId", sessionId.toString())
.set("counter", generatedEventCounter++)
.build();
return AvroRecordBuffer.fromRecord(partyId, sessionId, Instant.EPOCH, record);
return AvroRecordBuffer.fromRecord(partyId, sessionId, eventId, EVENT_TIMESTAMP, record);
}

private Item<AvroRecordBuffer> itemFromAvroRecordBuffer(final AvroRecordBuffer message) {
Expand Down Expand Up @@ -199,6 +204,21 @@ public void testMessagesHavePartyIdAttribute() {
deliveredMessage.getAttributesOrThrow("partyIdentifier"));
}

@Test
public void testMessagesHaveEventIdAttribute() {
processSingleMessage();
final PubsubMessage deliveredMessage = getFirstPublishedMessage();
assertEquals(sessionId.orElseThrow(IllegalStateException::new).toString() + "-0",
deliveredMessage.getAttributesOrThrow("eventIdentifier"));
}

@Test
public void testMessagesHaveTimestampAttribute() {
processSingleMessage();
final PubsubMessage deliveredMessage = getFirstPublishedMessage();
assertEquals("2018-09-14T13:32:10.034261025Z", deliveredMessage.getAttributesOrThrow("timestamp"));
}

@Test
public void testMessagesHaveSchemaFingerprint() {
processSingleMessage();
Expand Down

0 comments on commit dd93c6e

Please sign in to comment.