Skip to content

Commit

Permalink
Merge latest changes from Azure/azure-event-hubs#444 (Azure#3511)
Browse files Browse the repository at this point in the history
* Update Apache Proton-J dependency (0.29.0 --> 0.31.0) (Azure#407)

* PartitionReceiver - add a method that provides an EventPosition which corresponds to an EventData returned last by the receiver (Azure#408)

* Support IsPartitionEmpty property for PartitionRuntimeInformation (Azure#399)

* Move setPrefetchCount API to the ReceiverOptions class from the PartitionReceiver and update the settings of Default & Max Prefetch count (Azure#410)

This pull request includes two major changes related to Prefetch API.

1) Move setPrefetchCount API to the ReceiverOptions class so that prefetch value specified by a user can be used instead of using default value when communicating to the service during link open and initializing a receiver. This change also addresses the receiver stuck issue caused by setPrefetchAPI in a race condition.

2) Change the default value and set the upper bound of the prefetch count. Note that prefetch count should be greater than or equal to maxEventCount which can be set when either a) calling receive() API or b) implementing the getMaxEventCount API of the SessionReceiverHandler interface.

* Fixes several issues in the reactor related components (Azure#411)

This pull request contains the following changes.

1) Finish pending tasks when recreating the reactor and make sure pending calls scheduled on the old reactor get complete.
2) Fix the session open timeout issue which can result in NPE in proton-J engine.
3) Make session open timeout configurable and use the value of OperationTimeout.
4) Update the message of exceptions and include an entity name in the exception message.
5) API change - use ScheduledExecutorService.
6) Improve tracing.

* Implement comparable on EventData (Azure#395)

* Update receive/send link creation logic and improve tracing (Azure#414)

* Prep for releasing client 2.0.0 and EPH 2.2.0 (Azure#415)

* Ensure that links are closed when transport error occurrs (Azure#417)

* ensure links are recreated on transport/connection failure
* update API document for EventProcessorOptions class
* add traces for link create/close case

* Prep for releasing client 2.1.0 and EPH 2.3.0 (Azure#418)

* Update prefetch sendflow logic and increment version for new release (Azure#420)

* Fix args for proxy auth call to Authenticator (Azure#421)

* Prepare EPH 2.3.4 release (Azure#423)

* Prepare EPH 2.4.0 release (Azure#423) (Azure#424)

* Handle proton:io errors with meaningful error msg (Azure#427)

* Handle proton:io errors with meaningful error msg

* Use Proton-supplied message if present

* Minor changes to lease scanner (Azure#428)

* Add logging if the scanner threw an exception.
* Change logging level to warn when scanner shuts down for any reason.
* Scanner can call EventProcessorOptions.notifyOfException, which calls user code. Change notifyOfException to defensively catch any exceptions coming out of user code.

* Make EventData.SystemProperties completely public (Azure#435)

Porting testability changes from .NET Core to Java: provide full access to EventData's SystemProperties so that a complete EventData can be fabricated in tests.

* Digest Support: init first connection with null headers (Azure#431)

Related to Azure/qpid-proton-j-extensions#10

* Fix lease scanner issues when Storage unreachable (Azure#434)

This fix is for issue Azure#432. There are two parts:

AzureStorageCheckpointLeaseManager performs certain Storage actions within a forEach. If those actions fail, the StorageException gets wrapped in a NoSuchElementException. Catch those and strip off the NoSuchElementException, then handle the StorageException in the existing way.

The unexpected NoSuchElementExceptions were not being caught anywhere and the scanner thread was dying without rescheduling itself. Added code in PartitionMananger.scan to catch any exceptions that leak out of PartitionScanner and reschedule the scanner unless the host instance is shutting down.

* message receiver - fix null pointer error and ensure that receive link is recreated upon a failure (Azure#439)

* message receiver/sender - fix null pointer error and ensure that receive/send link is recreated on a failure.

* Update version numbers for release (Azure#440)

* Update prefetch count for a receiver (Azure#441)

* Fix an issue of creating multiple sessions for $management & $cbs channel for a single connection and improve logging (Azure#443)

* Fix an issue of creating multiple sessions for $management & $cbs for a connection and improve logging

* Update version numbers for new release (Azure#444)

* Update spotbugs.xml report versions
  • Loading branch information
conniey authored May 3, 2019
1 parent 0f05332 commit 9cc2dc8
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 194 deletions.
4 changes: 2 additions & 2 deletions eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

<properties>
<azure-batch.version>5.0.1</azure-batch.version>
<azure-eventhubs.version>2.3.0</azure-eventhubs.version>
<azure-eventhubs-eph.version>2.5.0</azure-eventhubs-eph.version>
<azure-eventhubs.version>2.3.1</azure-eventhubs.version>
<azure-eventhubs-eph.version>2.5.1</azure-eventhubs-eph.version>
<azure-keyvault.version>1.2.0</azure-keyvault.version>
<azure-servicebus.version>2.0.0</azure-servicebus.version>
<azure-storage-blob.version>10.5.0</azure-storage-blob.version>
Expand Down
206 changes: 103 additions & 103 deletions eventhubs/data-plane/ConsumingEvents.md

Large diffs are not rendered by default.

130 changes: 65 additions & 65 deletions eventhubs/data-plane/PublishingEvents.md
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
# Publishing Events with the Java client for Azure Event Hubs
# Publishing Events with the Java client for Azure Event Hubs

The vast majority of Event Hub applications using this and the other client libraries are and will be event publishers.
The vast majority of Event Hub applications using this and the other client libraries are and will be event publishers.
And for most of these publishers, publishing events is extremely simple and handled with just a few API gestures.

## Getting Started

This library is available for use in Maven projects from the Maven Central Repository, and can be referenced using the
following dependency declaration inside of your Maven project file:
following dependency declaration inside of your Maven project file:

```XML
<dependency>
<groupId>com.microsoft.azure</groupId>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
</dependency>
```

For different types of build environments, the latest released JAR files can also be [explicitly obtained from the
Maven Central Repository](https://search.maven.org/#search%7Cga%7C1%7Ca%3A%22azure-eventhubs%22) or from [the Release distribution point on GitHub](https://github.com/Azure/azure-event-hubs/releases).

For different types of build environments, the latest released JAR files can also be [explicitly obtained from the
Maven Central Repository](https://search.maven.org/#search%7Cga%7C1%7Ca%3A%22azure-eventhubs%22) or from [the Release distribution point on GitHub](https://github.com/Azure/azure-event-hubs/releases).


For a simple event publisher, you'll need to import the *com.microsoft.azure.eventhubs* package for the Event Hub client classes.


For a simple event publisher, you'll need to import the *com.microsoft.azure.eventhubs* package for the Event Hub client classes.


```Java
import com.microsoft.azure.eventhubs.*;
```

Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
Event Hubs client library uses qpid proton reactor framework which exposes AMQP connection and message delivery related
state transitions as reactive events. In the process,
the library will need to run many asynchronous tasks while sending and receiving messages to Event Hubs.
So, `EventHubClient` requires an instance of `Executor`, where all these tasks are run.
Expand All @@ -37,63 +37,63 @@ So, `EventHubClient` requires an instance of `Executor`, where all these tasks a
ScheduledExecutorService executor = Executors.newScheduledThreadPool(8)
```

Using an Event Hub connection string, which holds all required connection information including an authorization key or token
(see [Connection Strings](#connection-strings)), you then create an *EventHubClient* instance.
Using an Event Hub connection string, which holds all required connection information including an authorization key or token
(see [Connection Strings](#connection-strings)), you then create an *EventHubClient* instance.

```Java
ConnectionStringBuilder connStr = new ConnectionStringBuilder()
.setNamespaceName("----ServiceBusNamespaceName-----")
.setEventHubName("----EventHubName-----")
.setSasKeyName("-----SharedAccessSignatureKeyName-----")
.setSasKey("---SharedAccessSignatureKey----");
.setSasKey("---SharedAccessSignatureKey----");

EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executor);
```

Once you have the client in hands, you can package any arbitrary payload as a plain array of bytes and send it. The samples
we use to illustrate the functionality send a UTF-8 encoded JSON data, but you can transfer any format you wish.
Once you have the client in hands, you can package any arbitrary payload as a plain array of bytes and send it. The samples
we use to illustrate the functionality send a UTF-8 encoded JSON data, but you can transfer any format you wish.

```Java
EventData sendEvent = EventData.create(payloadBytes);
ehClient.sendSync(sendEvent);
```
The entire client API is built for Java 8's concurrent task model, generally returning

The entire client API is built for Java 8's concurrent task model, generally returning
[*CompletableFuture<T>*](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html), so the library has these methods suffixed with *Sync* as their Synchronous counterparts/varaints.

## AMQP 1.0
Azure Event Hubs allows for publishing events using the HTTPS and AMQP 1.0 protocols. The Azure Event Hub endpoints
also support AMQP over the WebSocket protocol, allowing event traffic to leverage the same outbound TCP port as
HTTPS.
also support AMQP over the WebSocket protocol, allowing event traffic to leverage the same outbound TCP port as
HTTPS.

This client library is built on top of the [Apache Qpid Proton-J]() libraries and supports AMQP, which is significantly
more efficient at publishing event streams than HTTPS. AMQP 1.0 is an international standard published as ISO/IEC 19464:2014.
This client library is built on top of the [Apache Qpid Proton-J]() libraries and supports AMQP, which is significantly
more efficient at publishing event streams than HTTPS. AMQP 1.0 is an international standard published as ISO/IEC 19464:2014.

AMQP is session-oriented and sets up the required addressing information and authorization information just once for each
send link, while HTTPS requires doing so with each sent message. AMQP also has a compact binary format to express common
event properties, while HTTPS requires passing message metadata in a verbose text format. AMQP can also keep a significant
number of events "in flight" with asynchronous and robust acknowledgement flow, while HTTPS enforces a strict request-reply
AMQP is session-oriented and sets up the required addressing information and authorization information just once for each
send link, while HTTPS requires doing so with each sent message. AMQP also has a compact binary format to express common
event properties, while HTTPS requires passing message metadata in a verbose text format. AMQP can also keep a significant
number of events "in flight" with asynchronous and robust acknowledgement flow, while HTTPS enforces a strict request-reply
pattern.

AMQP 1.0 is a TCP based protocol. For Azure Event Hubs, all traffic *must* be protected using TLS (SSL) and is using
TCP port 5671.
AMQP 1.0 is a TCP based protocol. For Azure Event Hubs, all traffic *must* be protected using TLS (SSL) and is using
TCP port 5671.

This library will provide HTTPS support via WebSockets when Proton-J supports HTTPS.

## Connection Strings

Azure Event Hubs and Azure Service Bus share a common format for connection strings. A connection string holds all required
information to set up a connection with an Event Hub. The format is a simple property/value list of the form
{property}={value} with pairs separated by ampersands (&).
information to set up a connection with an Event Hub. The format is a simple property/value list of the form
{property}={value} with pairs separated by ampersands (&).

| Property | Description |
|-----------------------|------------------------------------------------------------|
|-----------------------|------------------------------------------------------------|
| Endpoint | URI for the Event Hubs namespace. Typically has the form *sb://{namespace}.servicebus.windows.net/* |
| EntityPath | Relative path of the Event Hub in the namespace. Commonly this is just the Event Hub name |
| EntityPath | Relative path of the Event Hub in the namespace. Commonly this is just the Event Hub name |
| SharedAccessKeyName | Name of a Shared Access Signature rule configured for the Event Hub or the Event Hub name. For publishers, the rule must include "Send" permissions. |
| SharedAccessKey | Base64-encoded value of the Shared Access Key for the rule |
| SharedAccessSignature | A previously issued Shared Access Signature token (not yet supported; will be soon) |

A connection string will therefore have the following form:

```
Expand All @@ -102,42 +102,42 @@ A connection string will therefore have the following form:

## Advanced Operations

The publisher example shown in the overview above sends an event into the Event Hub without further qualification. This is
the preferred and most flexible and reliable option. For specific needs, Event Hubs offers two extra options to
qualify send operations: Publisher policies and partion addressing.
The publisher example shown in the overview above sends an event into the Event Hub without further qualification. This is
the preferred and most flexible and reliable option. For specific needs, Event Hubs offers two extra options to
qualify send operations: Publisher policies and partion addressing.

### Partition Addressing

Any Event Hub's event store is split up into at least 4 partitions, each maintaining a separate event log. You can think
of partitions like lanes on a highway. The more events the Event Hub needs to handle, the more lanes (partitions) you have
to add. Each partition can handle at most the equivalent of 1 "throughput unit", equivalent to at most 1000 events per
Any Event Hub's event store is split up into at least 4 partitions, each maintaining a separate event log. You can think
of partitions like lanes on a highway. The more events the Event Hub needs to handle, the more lanes (partitions) you have
to add. Each partition can handle at most the equivalent of 1 "throughput unit", equivalent to at most 1000 events per
second and at most 1 Megabyte per second.

In some cases, publisher applications need to address partitions directly in order to pre-categorize events for consumption.
A partition is directly addressed either by using the partition's identifier or by using some string (partition key) that gets
A partition is directly addressed either by using the partition's identifier or by using some string (partition key) that gets
consistently hashed to a particular partition.

This capability, paired with a large number of partitions, may appear attractive for implementing a fine grained, per publisher
This capability, paired with a large number of partitions, may appear attractive for implementing a fine grained, per publisher
subscription scheme similar to what Topics offer in Service Bus Messaging - but it's not at all how the capability should be used
and it's likely not going to yield satisfying results.
Partition addressing is designed as a routing capability that consistently assigns events from the same sources to the same partition allowing
downstream consumer systems to be optimized, but under the assumption of very many of such sources (hundreds, thousands) share
the same partition. If you need fine-grained content-based routing, Service Bus Topics might be the better option.
and it's likely not going to yield satisfying results.

Partition addressing is designed as a routing capability that consistently assigns events from the same sources to the same partition allowing
downstream consumer systems to be optimized, but under the assumption of very many of such sources (hundreds, thousands) share
the same partition. If you need fine-grained content-based routing, Service Bus Topics might be the better option.

#### Using Partition Keys

Of the two addressing options, the preferable one is to let the hash algorithm map the event to the appropriate partition.
The gesture is a straightforward extra override to the send operation supplying the partition key:
The gesture is a straightforward extra override to the send operation supplying the partition key:

```Java
EventData sendEvent = EventData.create(payloadBytes);
> ehClient.sendSync(sendEvent, partitionKey);
```

#### Using Partition Ids

If you indeed need to target a specific partition, for instance because you must use a particular distribution strategy,
If you indeed need to target a specific partition, for instance because you must use a particular distribution strategy,
you can send directly to the partition, but doing so requires an extra gesture so that you don't accidentally choose this
option. To send to a partition you explicitly need to create a client object that is tied to the partition as shown below:

Expand All @@ -151,24 +151,24 @@ option. To send to a partition you explicitly need to create a client object tha
#### Publisher Policies

Event Hub Publisher Policies are not yet supported by this client and will be supported in a future release.

#### Special considerations for partitions and publisher policies

Using partitions or publisher policies (which are effectively a special kind of partition key) may impact throughput
and availability of your Event Hub solution.
Using partitions or publisher policies (which are effectively a special kind of partition key) may impact throughput
and availability of your Event Hub solution.

When you do a regular send operation that does not prescribe a particular partition, the Event Hub will choose a
partition at random, ensuring about equal distribution of events across partitions. Sticking with the above analogy,
all highway lanes get the same traffic.
When you do a regular send operation that does not prescribe a particular partition, the Event Hub will choose a
partition at random, ensuring about equal distribution of events across partitions. Sticking with the above analogy,
all highway lanes get the same traffic.

If you explicitly choose the partition key or partition-id, it's up to you to take care that traffic is evenly
distributed, otherwise you may end up with a traffic jam (in the form of throttling) on one partition while there's
little or no traffic on another partition.
If you explicitly choose the partition key or partition-id, it's up to you to take care that traffic is evenly
distributed, otherwise you may end up with a traffic jam (in the form of throttling) on one partition while there's
little or no traffic on another partition.

Also, like every other aspect of distributed systems, the log storage backing any partition may rarely and briefly slow
Also, like every other aspect of distributed systems, the log storage backing any partition may rarely and briefly slow
down or experience congestion. If you leave choosing the target partition for an event to Event Hubs, it can flexibly
react to such availability blips for publishers.
react to such availability blips for publishers.

Generally, you should *not* use partitioning as a traffic prioritization scheme, and you should *not* use it
for fine grained assignment of particular kinds of events to a particular partitions. *Partitions are a load
Generally, you should *not* use partitioning as a traffic prioritization scheme, and you should *not* use it
for fine grained assignment of particular kinds of events to a particular partitions. *Partitions are a load
distribution mechanism, not a filtering model*.
4 changes: 2 additions & 2 deletions eventhubs/data-plane/azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>2.5.0</version>
<version>2.5.1</version>

<name>Microsoft Azure SDK for Event Hubs Event Processor Host(EPH)</name>
<description>EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer</description>
Expand Down
2 changes: 1 addition & 1 deletion eventhubs/data-plane/azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion eventhubs/data-plane/azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<version>2.3.1</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ private void parseConnectionString(final String connectionString) {
this.transportType = TransportType.fromString(values[valueIndex]);
} catch (IllegalArgumentException exception) {
throw new IllegalConnectionStringFormatException(
String.format("Invalid value specified for property '%s' in the ConnectionString.", TRANSPORT_TYPE_CONFIG_NAME),
String.format(Locale.US, "Invalid value specified for property '%s' in the ConnectionString.", TRANSPORT_TYPE_CONFIG_NAME),
exception);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public final class ClientConstants {
public static final String NO_RETRY = "NoRetry";
public static final String DEFAULT_RETRY = "Default";
public static final String PRODUCT_NAME = "MSJavaClient";
public static final String CURRENT_JAVACLIENT_VERSION = "2.3.0";
public static final String CURRENT_JAVACLIENT_VERSION = "2.3.1";
public static final String PLATFORM_INFO = getPlatformInfo();
public static final String FRAMEWORK_INFO = getFrameworkInfo();
public static final String CBS_ADDRESS = "$cbs";
Expand Down
2 changes: 1 addition & 1 deletion eventhubs/data-plane/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<packaging>pom</packaging>
<version>2.3.0</version>
<version>2.3.1</version>

<name>Microsoft Azure Event Hubs SDK Parent</name>
<description>Java libraries for talking to Windows Azure Event Hubs</description>
Expand Down
Loading

0 comments on commit 9cc2dc8

Please sign in to comment.