From 30ac16734de6556ad64e7824986c68a310d0f890 Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 13:13:17 -0700
Subject: [PATCH 01/32] Formatting changes in EventHubAsyncProducer.
---
.../messaging/eventhubs/EventHubAsyncProducer.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncProducer.java
index e4626ddf7045c..ca3eeef4db775 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncProducer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncProducer.java
@@ -90,6 +90,7 @@
* same partition because they all share the same {@link BatchOptions#partitionKey()}.
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncproducer.send#eventDataBatch}
+ *
* @see EventHubAsyncClient#createProducer()
*/
@Immutable
@@ -166,10 +167,13 @@ public Mono createBatch(BatchOptions options) {
/**
* Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size
* allowed, an exception will be triggered and the send will fail.
+
*
+ *
* @param event Event to send to the service.
*
* @return A {@link Mono} that completes when the event is pushed to the service.
@@ -183,11 +187,13 @@ public Mono send(EventData event) {
/**
* Sends a single event to the associated Event Hub with the send options. If the size of the single event exceeds
* the maximum size allowed, an exception will be triggered and the send will fail.
+ *
*
* For more information regarding the maximum event size allowed, see
* Azure Event Hubs Quotas and
* Limits.
- * @param event Event to send to the service.
+ *
+ * @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*
* @return A {@link Mono} that completes when the event is pushed to the service.
@@ -217,7 +223,7 @@ public Mono send(Iterable events) {
* Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
* maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
* size is the max amount allowed on the link.
- * @param events Events to send to the service.
+ * @param events Events to send to the service.
* @param options The set of options to consider when sending this batch.
*
* @return A {@link Mono} that completes when all events are pushed to the service.
From 5c041aaeb5b16052c5121ef5ec75b349f5df2da4 Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 13:17:32 -0700
Subject: [PATCH 02/32] Adding EventHubClient, EventHubConsumer, and
EventHubProducer.
---
.../messaging/eventhubs/EventHubClient.java | 167 ++++++++++++++++
.../messaging/eventhubs/EventHubConsumer.java | 7 +
.../messaging/eventhubs/EventHubProducer.java | 188 ++++++++++++++++++
3 files changed, 362 insertions(+)
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
new file mode 100644
index 0000000000000..c16e46b6dec69
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -0,0 +1,167 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.RetryOptions;
+import com.azure.core.implementation.annotation.ReturnType;
+import com.azure.core.implementation.annotation.ServiceMethod;
+import com.azure.messaging.eventhubs.implementation.ConnectionOptions;
+import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
+import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
+import com.azure.messaging.eventhubs.models.EventPosition;
+
+/**
+ * The main point of interaction with Azure Event Hubs, the client offers a connection to a specific Event Hub within
+ * the Event Hubs namespace and offers operations for sending event data, receiving events, and inspecting the connected
+ * Event Hub.
+ *
+ *
+ * Creating a synchronous {@link EventHubClient} using Event Hubs namespace connection string
+ *
+ * Creating a synchronous {@link EventHubClient} using Event Hub instance connection string
+ *
+ *
+ * {@codesnippet com.azure.messaging.eventhubs.eventhubclientbuilder.connectionstring#string}
+ *
+ * @see EventHubClientBuilder
+ * @see EventHubAsyncClient To communicate with Event Hub using an asynchronous interface.
+ * @see About Azure Event Hubs
+ */
+public class EventHubClient {
+ private final EventHubAsyncClient client;
+ private final RetryOptions retry;
+ private final EventHubProducerOptions defaultProducerOptions;
+ private final EventHubConsumerOptions defaultConsumerOptions;
+
+ EventHubClient(ConnectionOptions connectionOptions, EventHubAsyncClient client) {
+ this.retry = connectionOptions.retry();
+ this.defaultProducerOptions = new EventHubProducerOptions()
+ .retry(connectionOptions.retry());
+ this.defaultConsumerOptions = new EventHubConsumerOptions()
+ .retry(connectionOptions.retry())
+ .scheduler(connectionOptions.scheduler());
+ this.client = client;
+ }
+
+ /**
+ * Retrieves information about an Event Hub, including the number of partitions present and their identifiers.
+ *
+ * @return The set of information for the Event Hub that this client is associated with.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public EventHubProperties getProperties() {
+ return client.getProperties().block(retry.tryTimeout());
+ }
+
+ /**
+ * Retrieves the identifiers for all the partitions of an Event Hub.
+ *
+ * @return The identifiers for all partitions of an Event Hub.
+ */
+ @ServiceMethod(returns = ReturnType.COLLECTION)
+ public Iterable getPartitionIds() {
+ return client.getPartitionIds().collectList().block(retry.tryTimeout());
+ }
+
+ /**
+ * Retrieves information about a specific partition for an Event Hub, including elements that describe the available
+ * events in the partition event stream.
+ *
+ * @param partitionId The unique identifier of a partition associated with the Event Hub.
+ * @return The information for the requested partition under the Event Hub this client is associated with.
+ */
+ @ServiceMethod(returns = ReturnType.SINGLE)
+ public PartitionProperties getPartitionProperties(String partitionId) {
+ return client.getPartitionProperties(partitionId).block(retry.tryTimeout());
+ }
+
+ /**
+ * Creates an Event Hub producer responsible for transmitting {@link EventData} to the Event Hub, grouped together
+ * in batches. Event data is automatically routed to an available partition.
+ *
+ * @return A new {@link EventHubProducer}.
+ */
+ public EventHubProducer createProducer() {
+ return createProducer(defaultProducerOptions);
+ }
+
+ /**
+ * Creates an Event Hub producer responsible for transmitting {@link EventData} to the Event Hub, grouped together
+ * in batches. If {@link EventHubProducerOptions#partitionId() options.partitionId()} is not {@code null}, the
+ * events are routed to that specific partition. Otherwise, events are automatically routed to an available
+ * partition.
+ *
+ * @param options The set of options to apply when creating the producer.
+ * @return A new {@link EventHubProducer}.
+ * @throws NullPointerException if {@code options} is {@code null}.
+ */
+ public EventHubProducer createProducer(EventHubProducerOptions options) {
+ final EventHubAsyncProducer producer = client.createProducer();
+
+ return new EventHubProducer(producer, options.retry().tryTimeout());
+ }
+
+ /**
+ * Creates an Event Hub consumer responsible for reading {@link EventData} from a specific Event Hub partition, as a
+ * member of the specified consumer group, and begins reading events from the {@code eventPosition}.
+ *
+ * The consumer created is non-exclusive, allowing multiple consumers from the same consumer group to be actively
+ * reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
+ * Consumers".
+ *
+ * @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in
+ * the context of this group. The name of the consumer group that is created by default is {@link
+ * EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
+ * @param partitionId The identifier of the Event Hub partition.
+ * @param eventPosition The position within the partition where the consumer should begin reading events.
+ * @return A new {@link EventHubConsumer} that receives events from the partition at the given position.
+ * @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
+ * empty string.
+ */
+ public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition) {
+ final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition);
+ return new EventHubConsumer();
+ }
+
+ /**
+ * Creates an Event Hub consumer responsible for reading {@link EventData} from a specific Event Hub partition, as a
+ * member of the configured consumer group, and begins reading events from the specified {@code eventPosition}.
+ *
+ *
+ * A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that
+ * only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes
+ * referred to as "Epoch Consumers."
+ *
+ * A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively
+ * reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
+ * Consumers."
+ *
+ * Designating a consumer as exclusive may be specified in the {@code options}, by setting {@link
+ * EventHubConsumerOptions#ownerLevel(Long)} to a non-null value. By default, consumers are created as
+ * non-exclusive.
+ *
+ *
+ * @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in
+ * the context of this group. The name of the consumer group that is created by default is {@link
+ * EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
+ * @param partitionId The identifier of the Event Hub partition from which events will be received.
+ * @param eventPosition The position within the partition where the consumer should begin reading events.
+ * @param options The set of options to apply when creating the consumer.
+ * @return An new {@link EventHubConsumer} that receives events from the partition with all configured {@link
+ * EventHubConsumerOptions}.
+ * @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
+ * empty string.
+ */
+ public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
+ EventHubConsumerOptions options) {
+ final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition, options);
+ return new EventHubConsumer();
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
new file mode 100644
index 0000000000000..0a998cc07d88e
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
@@ -0,0 +1,7 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+public class EventHubConsumer {
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java
new file mode 100644
index 0000000000000..82787356ef74e
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java
@@ -0,0 +1,188 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.implementation.annotation.Immutable;
+import com.azure.messaging.eventhubs.models.BatchOptions;
+import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
+import com.azure.messaging.eventhubs.models.SendOptions;
+
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * A producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped together in batches.
+ * Depending on the options specified at creation, the producer may be created to allow event data to be automatically
+ * routed to an available partition or specific to a partition.
+ *
+ *
+ * Allowing automatic routing of partitions is recommended when:
+ *
+ *
The sending of events needs to be highly available.
+ *
The event data should be evenly distributed among all available partitions.
+ *
+ *
+ *
+ *
+ * If no partition is specified, the following rules are used for automatically selecting one:
+ *
+ *
Distribute the events equally amongst all available partitions using a round-robin approach.
+ *
If a partition becomes unavailable, the Event Hubs service will automatically detect it and forward the
+ * message to another available partition.
+ *
+ *
+ *
+ *
Create a producer that routes events to any partition
+ * To allow automatic routing of messages to available partition, do not specify the {@link
+ * EventHubProducerOptions#partitionId() partitionId} when creating the {@link EventHubProducer}.
+ *
Create a producer that publishes events to partition "foo" with a timeout of 45 seconds.
+ *
+ * Developers can push events to a single partition by specifying the {@link EventHubProducerOptions#partitionId(String)
+ * partitionId} when creating an {@link EventHubProducer}.
+ *
Publish events to the same partition, grouped together using {@link SendOptions#partitionKey(String)}.
+ *
+ * If developers want to push similar events to end up at the same partition, but do not require them to go to a
+ * specific partition, they can use {@link SendOptions#partitionKey(String)}.
+ *
+ * In the sample below, all the "sandwiches" end up in the same partition, but it could end up in partition 0, 1, etc.
+ * of the available partitions. All that matters to the end user is that they are grouped together.
+ *
+ * Developers can create an {@link EventDataBatch}, add the events they want into it, and publish these events together.
+ * When creating a {@link EventDataBatch batch}, developers can specify a set of {@link BatchOptions options} to
+ * configure this batch.
+ *
+ * In the scenario below, the developer is creating a networked video game. They want to receive telemetry about their
+ * users' gaming systems, but do not want to slow down the network with telemetry. So they limit the size of their
+ * {@link EventDataBatch batches} to be no larger than 256 bytes. The events within the batch also get hashed to the
+ * same partition because they all share the same {@link BatchOptions#partitionKey()}.
+ *
+ * {@codesnippet com.azure.messaging.eventhubs.eventhubproducer.send#eventDataBatch}
+ *
+ * @see EventHubClient#createProducer()
+ * @see EventHubAsyncProducer To asynchronously generate events to an Event Hub, see EventHubAsyncProducer.
+ */
+@Immutable
+public class EventHubProducer {
+ private final EventHubAsyncProducer producer;
+ private final Duration tryTimeout;
+
+ /**
+ * Creates a new instance of {@link EventHubProducer} that sends messages to an Azure Event Hub.
+ *
+ * @throws NullPointerException if {@code producer} or {@code tryTimeout} is null.
+ */
+ EventHubProducer(EventHubAsyncProducer producer, Duration tryTimeout) {
+ this.producer = Objects.requireNonNull(producer);
+ this.tryTimeout = Objects.requireNonNull(tryTimeout);
+ }
+
+ /**
+ * Creates an {@link EventDataBatch} that can fit as many events as the transport allows.
+ *
+ * @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
+ */
+ public EventDataBatch createBatch() {
+ return producer.createBatch().block(tryTimeout);
+ }
+
+ /**
+ * Creates an {@link EventDataBatch} that can fit as many events as the transport allows.
+ *
+ * @param options A set of options used to configure the {@link EventDataBatch}.
+ * @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
+ */
+ public EventDataBatch createBatch(BatchOptions options) {
+ return producer.createBatch(options).block(tryTimeout);
+ }
+
+ /**
+ * Sends a single event to the associated Event Hub. If the size of the single event exceeds the maximum size
+ * allowed, an exception will be triggered and the send will fail.
+ *
+ *
+ *
+ * @param event Event to send to the service.
+ */
+ public void send(EventData event) {
+ producer.send(event).block();
+ }
+
+ /**
+ * Sends a single event to the associated Event Hub with the send options. If the size of the single event exceeds
+ * the maximum size allowed, an exception will be triggered and the send will fail.
+ *
+ *
+ *
+ * @param event Event to send to the service.
+ * @param options The set of options to consider when sending this event.
+ */
+ public void send(EventData event, SendOptions options) {
+ producer.send(event, options).block();
+ }
+
+ /**
+ * Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
+ * maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
+ * size is the max amount allowed on the link.
+ *
+ *
+ *
+ * @param events Events to send to the service.
+ */
+ public void send(Iterable events) {
+ producer.send(events).block();
+ }
+
+ /**
+ * Sends a set of events to the associated Event Hub using a batched approach. If the size of events exceed the
+ * maximum size of a single batch, an exception will be triggered and the send will fail. By default, the message
+ * size is the max amount allowed on the link.
+ *
+ *
+ *
+ * @param events Events to send to the service.
+ * @param options The set of options to consider when sending this batch.
+ */
+ public void send(Iterable events, SendOptions options) {
+ producer.send(events, options).block();
+ }
+
+ /**
+ * Sends the batch to the associated Event Hub.
+ *
+ * @param batch The batch to send to the service.
+ * @throws NullPointerException if {@code batch} is {@code null}.
+ * @see EventHubProducer#createBatch()
+ * @see EventHubProducer#createBatch(BatchOptions)
+ */
+ public void send(EventDataBatch batch) {
+ producer.send(batch).block();
+ }
+}
From 4f3bab3010524364571de57b9d354ca45c62d8a7 Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 13:24:22 -0700
Subject: [PATCH 03/32] Exposing EventHubClient creation in
EventHubClientBuilder.
---
.../messaging/eventhubs/EventHubClient.java | 2 +-
.../eventhubs/EventHubClientBuilder.java | 164 +++++++++++-------
2 files changed, 103 insertions(+), 63 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index c16e46b6dec69..c72ab085843e9 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -38,7 +38,7 @@ public class EventHubClient {
private final EventHubProducerOptions defaultProducerOptions;
private final EventHubConsumerOptions defaultConsumerOptions;
- EventHubClient(ConnectionOptions connectionOptions, EventHubAsyncClient client) {
+ EventHubClient(EventHubAsyncClient client, ConnectionOptions connectionOptions) {
this.retry = connectionOptions.retry();
this.defaultProducerOptions = new EventHubProducerOptions()
.retry(connectionOptions.retry());
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index 8fd54fc4b7731..e13177c6c556f 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -277,74 +277,44 @@ public EventHubClientBuilder retry(RetryOptions retryOptions) {
* specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
*/
public EventHubAsyncClient buildAsyncClient() {
- configuration = configuration == null ? ConfigurationManager.getConfiguration().clone() : configuration;
-
- if (credentials == null) {
- final String connectionString = configuration.get(AZURE_EVENT_HUBS_CONNECTION_STRING);
-
- if (ImplUtils.isNullOrEmpty(connectionString)) {
- throw new IllegalArgumentException("Credentials have not been set using 'EventHubClientBuilder.credentials(String)'"
- + "EventHubClientBuilder.credentials(String, String, TokenCredential). And the connection string is"
- + "not set in the '" + AZURE_EVENT_HUBS_CONNECTION_STRING + "' environment variable.");
- }
-
- connectionString(connectionString);
- }
-
- if (retryOptions == null) {
- retryOptions = DEFAULT_RETRY;
- }
-
- // If the proxy has been configured by the user but they have overridden the TransportType with something that
- // is not AMQP_WEB_SOCKETS.
- if (proxyConfiguration != null && proxyConfiguration.isProxyAddressConfigured()
- && transport != TransportType.AMQP_WEB_SOCKETS) {
- throw new IllegalArgumentException("Cannot use a proxy when TransportType is not AMQP.");
- }
-
- if (proxyConfiguration == null) {
- proxyConfiguration = getDefaultProxyConfiguration(configuration);
- }
-
- if (scheduler == null) {
- scheduler = Schedulers.elastic();
- }
-
+ final ConnectionOptions connectionOptions = getConnectionOptions();
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);
- final CBSAuthorizationType authorizationType = credentials instanceof EventHubSharedAccessKeyCredential
- ? CBSAuthorizationType.SHARED_ACCESS_SIGNATURE
- : CBSAuthorizationType.JSON_WEB_TOKEN;
- final ConnectionOptions parameters = new ConnectionOptions(host, eventHubName, credentials, authorizationType,
- transport, retryOptions, proxyConfiguration, scheduler);
- return new EventHubAsyncClient(parameters, provider, handlerProvider);
+ return new EventHubAsyncClient(connectionOptions, provider, handlerProvider);
}
- private ProxyConfiguration getDefaultProxyConfiguration(Configuration configuration) {
- ProxyAuthenticationType authentication = ProxyAuthenticationType.NONE;
- if (proxyConfiguration != null) {
- authentication = proxyConfiguration.authentication();
- }
-
- String proxyAddress = configuration.get(BaseConfigurations.HTTP_PROXY);
-
- if (ImplUtils.isNullOrEmpty(proxyAddress)) {
- return ProxyConfiguration.SYSTEM_DEFAULTS;
- }
-
- final String[] hostPort = proxyAddress.split(":");
- if (hostPort.length < 2) {
- throw new IllegalArgumentException("HTTP_PROXY cannot be parsed into a proxy");
- }
-
- final String host = hostPort[0];
- final int port = Integer.parseInt(hostPort[1]);
- final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(host, port));
- final String username = configuration.get(ProxyConfiguration.PROXY_USERNAME);
- final String password = configuration.get(ProxyConfiguration.PROXY_PASSWORD);
+ /**
+ * Creates a new {@link EventHubClient} based on options set on this builder. Every time {@code buildAsyncClient()}
+ * is invoked, a new instance of {@link EventHubAsyncClient} is created.
+ *
+ *
+ * The following options are used if ones are not specified in the builder:
+ *
+ *
+ *
If no configuration is specified, the {@link ConfigurationManager#getConfiguration() global configuration}
+ * is used to provide any shared configuration values. The configuration values read are the {@link
+ * BaseConfigurations#HTTP_PROXY}, {@link ProxyConfiguration#PROXY_USERNAME}, and {@link
+ * ProxyConfiguration#PROXY_PASSWORD}.
+ *
If no retry is specified, the default retry options are used.
+ *
If no proxy is specified, the builder checks the {@link ConfigurationManager#getConfiguration() global
+ * configuration} for a configured proxy, then it checks to see if a system proxy is configured.
+ *
If no timeout is specified, a {@link ClientConstants#OPERATION_TIMEOUT timeout of one minute} is used.
+ *
If no scheduler is specified, an {@link Schedulers#elastic() elastic scheduler} is used.
+ *
+ *
+ * @return A new {@link EventHubClient} instance with all the configured options.
+ * @throws IllegalArgumentException if the credentials have not been set using either {@link
+ * #connectionString(String)} or {@link #credential(String, String, TokenCredential)}. Or, if a proxy is
+ * specified but the transport type is not {@link TransportType#AMQP_WEB_SOCKETS web sockets}.
+ */
+ public EventHubClient buildClient() {
+ final ConnectionOptions connectionOptions = getConnectionOptions();
+ final ReactorProvider provider = new ReactorProvider();
+ final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);
+ final EventHubAsyncClient client = new EventHubAsyncClient(connectionOptions, provider, handlerProvider);
- return new ProxyConfiguration(authentication, proxy, username, password);
+ return new EventHubClient(client, connectionOptions);
}
/**
@@ -429,4 +399,74 @@ public EventProcessor buildEventProcessor() {
return new EventProcessor(buildAsyncClient(), this.consumerGroupName,
this.partitionProcessorFactory, initialEventPosition, partitionManager, eventHubName);
}
+
+ private ConnectionOptions getConnectionOptions() {
+ configuration = configuration == null ? ConfigurationManager.getConfiguration().clone() : configuration;
+
+ if (credentials == null) {
+ final String connectionString = configuration.get(AZURE_EVENT_HUBS_CONNECTION_STRING);
+
+ if (ImplUtils.isNullOrEmpty(connectionString)) {
+ throw new IllegalArgumentException("Credentials have not been set using 'EventHubClientBuilder.credentials(String)'"
+ + "EventHubClientBuilder.credentials(String, String, TokenCredential). And the connection string is"
+ + "not set in the '" + AZURE_EVENT_HUBS_CONNECTION_STRING + "' environment variable.");
+ }
+
+ connectionString(connectionString);
+ }
+
+ if (retryOptions == null) {
+ retryOptions = DEFAULT_RETRY;
+ }
+
+ // If the proxy has been configured by the user but they have overridden the TransportType with something that
+ // is not AMQP_WEB_SOCKETS.
+ if (proxyConfiguration != null && proxyConfiguration.isProxyAddressConfigured()
+ && transport != TransportType.AMQP_WEB_SOCKETS) {
+ throw new IllegalArgumentException("Cannot use a proxy when TransportType is not AMQP.");
+ }
+
+ if (proxyConfiguration == null) {
+ proxyConfiguration = getDefaultProxyConfiguration(configuration);
+ }
+
+ if (scheduler == null) {
+ scheduler = Schedulers.elastic();
+ }
+
+ final CBSAuthorizationType authorizationType = credentials instanceof EventHubSharedAccessKeyCredential
+ ? CBSAuthorizationType.SHARED_ACCESS_SIGNATURE
+ : CBSAuthorizationType.JSON_WEB_TOKEN;
+
+ final ConnectionOptions parameters = new ConnectionOptions(host, eventHubName, credentials, authorizationType,
+ transport, retryOptions, proxyConfiguration, scheduler);
+
+ return parameters;
+ }
+
+ private ProxyConfiguration getDefaultProxyConfiguration(Configuration configuration) {
+ ProxyAuthenticationType authentication = ProxyAuthenticationType.NONE;
+ if (proxyConfiguration != null) {
+ authentication = proxyConfiguration.authentication();
+ }
+
+ String proxyAddress = configuration.get(BaseConfigurations.HTTP_PROXY);
+
+ if (ImplUtils.isNullOrEmpty(proxyAddress)) {
+ return ProxyConfiguration.SYSTEM_DEFAULTS;
+ }
+
+ final String[] hostPort = proxyAddress.split(":");
+ if (hostPort.length < 2) {
+ throw new IllegalArgumentException("HTTP_PROXY cannot be parsed into a proxy");
+ }
+
+ final String host = hostPort[0];
+ final int port = Integer.parseInt(hostPort[1]);
+ final Proxy proxy = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(host, port));
+ final String username = configuration.get(ProxyConfiguration.PROXY_USERNAME);
+ final String password = configuration.get(ProxyConfiguration.PROXY_PASSWORD);
+
+ return new ProxyConfiguration(authentication, proxy, username, password);
+ }
}
From 5e103497838a4c2cb11dfdfdb5f0871586c79e43 Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 13:30:39 -0700
Subject: [PATCH 04/32] EventHubClient, Consumer and Producer implements
Closeable.
---
.../azure/messaging/eventhubs/EventHubClient.java | 12 +++++++++++-
.../azure/messaging/eventhubs/EventHubConsumer.java | 11 ++++++++++-
.../azure/messaging/eventhubs/EventHubProducer.java | 12 +++++++++++-
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index c72ab085843e9..17a97ea966056 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -11,6 +11,8 @@
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.EventPosition;
+import java.io.Closeable;
+
/**
* The main point of interaction with Azure Event Hubs, the client offers a connection to a specific Event Hub within
* the Event Hubs namespace and offers operations for sending event data, receiving events, and inspecting the connected
@@ -32,7 +34,7 @@
* @see EventHubAsyncClient To communicate with Event Hub using an asynchronous interface.
* @see About Azure Event Hubs
*/
-public class EventHubClient {
+public class EventHubClient implements Closeable {
private final EventHubAsyncClient client;
private final RetryOptions retry;
private final EventHubProducerOptions defaultProducerOptions;
@@ -164,4 +166,12 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId,
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition, options);
return new EventHubConsumer();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() {
+ client.close();
+ }
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
index 0a998cc07d88e..8aad149bb5d20 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
@@ -3,5 +3,14 @@
package com.azure.messaging.eventhubs;
-public class EventHubConsumer {
+import java.io.Closeable;
+import java.io.IOException;
+
+public class EventHubConsumer implements Closeable {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ }
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java
index 82787356ef74e..d71484492a9f6 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java
@@ -8,6 +8,8 @@
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
+import java.io.Closeable;
+import java.io.IOException;
import java.time.Duration;
import java.util.Objects;
@@ -73,7 +75,7 @@
* @see EventHubAsyncProducer To asynchronously generate events to an Event Hub, see EventHubAsyncProducer.
*/
@Immutable
-public class EventHubProducer {
+public class EventHubProducer implements Closeable {
private final EventHubAsyncProducer producer;
private final Duration tryTimeout;
@@ -185,4 +187,12 @@ public void send(Iterable events, SendOptions options) {
public void send(EventDataBatch batch) {
producer.send(batch).block();
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void close() throws IOException {
+ producer.close();
+ }
}
From 4727715107cab35c5174e8f7c19534d535ce325c Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 13:31:08 -0700
Subject: [PATCH 05/32] Fixing sample by removing event hub instance from
namespace connection string.
---
.../eventhubs/EventHubAsyncClientJavaDocCodeSamples.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncClientJavaDocCodeSamples.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncClientJavaDocCodeSamples.java
index 99b1b75dfa203..9b58e26fafdfb 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncClientJavaDocCodeSamples.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncClientJavaDocCodeSamples.java
@@ -19,7 +19,7 @@ public class EventHubAsyncClientJavaDocCodeSamples {
public void instantiation() {
// BEGIN: com.azure.messaging.eventhubs.eventhubasyncclient.connectionString#string-string
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};"
- + "SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";
+ + "SharedAccessKey={sharedAccessKey}";
String eventHubName = "my-event-hub";
EventHubAsyncClient client = new EventHubClientBuilder()
From 4c94da565ee7bad17e9fbb6eabc79c5e2d27f66c Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 13:32:32 -0700
Subject: [PATCH 06/32] Remove unneeded sample in EventHubClientBuilder.
---
.../eventhubs/EventHubClientBuilder.java | 3 +--
...EventHubAsyncClientJavaDocCodeSamples.java | 25 -------------------
2 files changed, 1 insertion(+), 27 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index e13177c6c556f..1bd345350fc93 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -54,8 +54,7 @@
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubasyncclient.retry-timeout-scheduler}
*
- *
Creating an {@link EventProcessor} instance using Event Hub instance connection
- * string
+ *
Creating an {@link EventProcessor} instance using Event Hub instance connection string
- * Creating a synchronous {@link EventHubClient}
+ * Creating a synchronous {@link EventHubClient} using an Event Hub instance connection string
*
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubclient.instantiation}
From e473b67e03f0d18aabd3a5208fbd3bf2b061f05b Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 16:03:00 -0700
Subject: [PATCH 10/32] Add documentation to EventHubConsumer.
---
.../messaging/eventhubs/EventHubConsumer.java | 42 +++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
index 8aad149bb5d20..2543aa7c51f8b 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
@@ -3,10 +3,52 @@
package com.azure.messaging.eventhubs;
+import com.azure.core.http.rest.IterableResponse;
+import com.azure.messaging.eventhubs.models.EventHubConsumerOptions;
+import com.azure.messaging.eventhubs.models.EventPosition;
+import reactor.core.publisher.Flux;
+
import java.io.Closeable;
import java.io.IOException;
+import java.time.Duration;
+/**
+ * A consumer responsible for reading {@link EventData} from a specific Event Hub partition in the context of a specific
+ * consumer group.
+ *
+ *
+ *
If {@link EventHubConsumer} is created where {@link EventHubConsumerOptions#ownerLevel()} has a
+ * value, then Event Hubs service will guarantee only one active consumer exists per partitionId and consumer group
+ * combination. This consumer is sometimes referred to as an "Epoch Consumer."
+ *
Multiple consumers per partitionId and consumer group combination can be created by not setting
+ * {@link EventHubConsumerOptions#ownerLevel()} when creating consumers. This non-exclusive consumer is sometimes
+ * referred to as a "Non-Epoch Consumer."
+ *
+ *
+ * @see EventHubClient#createConsumer(String, String, EventPosition)
+ * @see EventHubClient#createConsumer(String, String, EventPosition, EventHubConsumerOptions)
+ */
public class EventHubConsumer implements Closeable {
+ /**
+ * Receives a batch of EventData from the Event Hub partition.
+ *
+ * @param maximumMessageCount The maximum number of messages to receive in this batch.
+ */
+ IterableResponse receive(int maximumMessageCount) {
+ return new IterableResponse<>(Flux.empty());
+ }
+
+ /**
+ * Receives a batch of EventData from the Event Hub partition
+ *
+ * @param maximumMessageCount The maximum number of messages to receive in this batch.
+ * @param maximumWaitTime The maximum amount of time to wait to build up the requested message count for the
+ * batch; if not specified, the default wait time specified when the consumer was created will be used.
+ */
+ IterableResponse receive(int maximumMessageCount, Duration maximumWaitTime) {
+ return new IterableResponse<>(Flux.empty());
+ }
+
/**
* {@inheritDoc}
*/
From beb0090953b3cfc1db9718de886ef5d2e67639a5 Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 21:28:20 -0700
Subject: [PATCH 11/32] Fixing links to EventHubAsyncProducer samples.
---
...entHubAsyncProducerJavaDocCodeSamples.java | 22 ++++++++++---------
1 file changed, 12 insertions(+), 10 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java
index 1c00ee7e23e5e..d9fd13357b922 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java
@@ -22,7 +22,8 @@ public class EventHubAsyncProducerJavaDocCodeSamples {
private final EventHubAsyncClient client = new EventHubClientBuilder().connectionString("fake-string").buildAsyncClient();
/**
- * Code snippet demonstrating how to create an EventHubProducer that automatically routes events to any partition.
+ * Code snippet demonstrating how to create an {@link EventHubAsyncProducer} that automatically routes events to any
+ * partition.
*
* @throws IOException if the producer cannot be disposed.
*/
@@ -39,7 +40,8 @@ public void instantiate() throws IOException {
}
/**
- * Code snippet demonstrating how to create an EventHubProducer that routes events to a single partition.
+ * Code snippet demonstrating how to create an {@link EventHubAsyncProducer} that routes events to a single
+ * partition.
*
* @throws IOException if the producer cannot be disposed.
*/
@@ -99,14 +101,14 @@ public void sendEventDataBatch() {
// The sample Flux contains three events, but it could be an infinite stream of telemetry events.
telemetryEvents.subscribe(event -> {
- final EventDataBatch batch = currentBatch.get();
- if (!batch.tryAdd(event)) {
- producer.createBatch(options).map(newBatch -> {
- currentBatch.set(newBatch);
- return producer.send(batch);
- }).block();
- }
- }, error -> System.err.println("Error received:" + error),
+ final EventDataBatch batch = currentBatch.get();
+ if (!batch.tryAdd(event)) {
+ producer.createBatch(options).map(newBatch -> {
+ currentBatch.set(newBatch);
+ return producer.send(batch);
+ }).block();
+ }
+ }, error -> System.err.println("Error received:" + error),
() -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null) {
From 17edcedd769ca6d7a77e55d347a45e09df8e4610 Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 21:28:34 -0700
Subject: [PATCH 12/32] Adding EventHubProducer code samples.
---
.../EventHubProducerJavaDocCodeSamples.java | 111 ++++++++++++++++++
1 file changed, 111 insertions(+)
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java
new file mode 100644
index 0000000000000..a525234d4bad4
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java
@@ -0,0 +1,111 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.RetryOptions;
+import com.azure.messaging.eventhubs.models.BatchOptions;
+import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
+import com.azure.messaging.eventhubs.models.SendOptions;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * Contains code snippets when generating javadocs through doclets for {@link EventHubProducer}.
+ */
+public class EventHubProducerJavaDocCodeSamples {
+ private final EventHubClient client = new EventHubClientBuilder()
+ .connectionString("fake-string")
+ .buildClient();
+
+ /**
+ * Code snippet demonstrating how to create an {@link EventHubProducer} that automatically routes events to any
+ * partition.
+ *
+ * @throws IOException if the producer cannot be disposed.
+ */
+ public void instantiate() throws IOException {
+ // BEGIN: com.azure.messaging.eventhubs.eventhubproducer.instantiation
+ EventHubClient client = new EventHubClientBuilder()
+ .connectionString("event-hubs-namespace-connection-string", "event-hub-name")
+ .buildClient();
+
+ EventHubProducer producer = client.createProducer();
+ // END: com.azure.messaging.eventhubs.eventhubproducer.instantiation
+
+ producer.close();
+ }
+
+ /**
+ * Code snippet demonstrating how to create an {@link EventHubProducer} that routes events to a single partition.
+ *
+ * @throws IOException if the producer cannot be disposed.
+ */
+ public void instantiatePartitionProducer() throws IOException {
+ // BEGIN: com.azure.messaging.eventhubs.eventhubproducer.instantiation#partitionId
+ RetryOptions retryOptions = new RetryOptions()
+ .tryTimeout(Duration.ofSeconds(45));
+ EventHubProducerOptions options = new EventHubProducerOptions()
+ .partitionId("foo")
+ .retry(retryOptions);
+
+ EventHubProducer producer = client.createProducer(options);
+ // END: com.azure.messaging.eventhubs.eventhubproducer.instantiation#partitionId
+
+ producer.close();
+ }
+
+ /**
+ * Code snippet demonstrating how to send events with a partition key.
+ */
+ public void sendEventsSendOptions() {
+ // BEGIN: com.azure.messaging.eventhubs.eventhubproducer.send#publisher-sendOptions
+ final List events = Arrays.asList(
+ new EventData("sourdough".getBytes(UTF_8)),
+ new EventData("rye".getBytes(UTF_8)),
+ new EventData("wheat".getBytes(UTF_8))
+ );
+
+ final EventHubProducer producer = client.createProducer();
+ final SendOptions options = new SendOptions()
+ .partitionKey("bread");
+
+ producer.send(events, options);
+ // END: com.azure.messaging.eventhubs.eventhubproducer.send#publisher-sendOptions
+ }
+
+ /**
+ * Code snippet demonstrating how to create an {@link EventDataBatch} and send it.
+ */
+ public void sendEventDataBatch() {
+ final EventHubProducer producer = client.createProducer();
+
+ // BEGIN: com.azure.messaging.eventhubs.eventhubproducer.send#eventDataBatch
+ final List telemetryEvents = Arrays.asList(
+ new EventData("92".getBytes(UTF_8)).addProperty("telemetry", "latency"),
+ new EventData("98".getBytes(UTF_8)).addProperty("telemetry", "cpu-temperature"),
+ new EventData("120".getBytes(UTF_8)).addProperty("telemetry", "fps")
+ );
+
+ final BatchOptions options = new BatchOptions()
+ .partitionKey("telemetry")
+ .maximumSizeInBytes(256);
+
+ EventDataBatch currentBatch = producer.createBatch(options);
+
+ // For each telemetry event, we try to add it to the current batch.
+ // When the batch is full, send it then create another batch to add more events to.
+ for (EventData event : telemetryEvents) {
+ if (!currentBatch.tryAdd(event)) {
+ producer.send(currentBatch);
+ currentBatch = producer.createBatch(options);
+ }
+ }
+ // END: com.azure.messaging.eventhubs.eventhubproducer.send#eventDataBatch
+ }
+}
From d4a7871ab41535f93784937fadb6697e7e27e57e Mon Sep 17 00:00:00 2001
From: Connie
Date: Mon, 12 Aug 2019 22:13:56 -0700
Subject: [PATCH 13/32] Fixing spotbug issues.
---
.../messaging/eventhubs/EventHubClient.java | 4 ++--
.../messaging/eventhubs/EventHubConsumer.java | 13 +++++++++++++
.../EventHubAsyncProducerJavaDocCodeSamples.java | 16 ++++++++--------
3 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index a08815d78c07f..4f42a6135e5f0 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -124,7 +124,7 @@ public EventHubProducer createProducer(EventHubProducerOptions options) {
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition);
- return new EventHubConsumer();
+ return new EventHubConsumer(consumer, defaultConsumerOptions);
}
/**
@@ -160,7 +160,7 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId,
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition, options);
- return new EventHubConsumer();
+ return new EventHubConsumer(consumer, options);
}
/**
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
index 2543aa7c51f8b..3b2199420835d 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java
@@ -11,6 +11,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
+import java.util.Objects;
/**
* A consumer responsible for reading {@link EventData} from a specific Event Hub partition in the context of a specific
@@ -29,6 +30,17 @@
* @see EventHubClient#createConsumer(String, String, EventPosition, EventHubConsumerOptions)
*/
public class EventHubConsumer implements Closeable {
+ private final EventHubAsyncConsumer consumer;
+ private final EventHubConsumerOptions options;
+
+ EventHubConsumer(EventHubAsyncConsumer consumer, EventHubConsumerOptions options) {
+ this.consumer = Objects.requireNonNull(consumer);
+ this.options = Objects.requireNonNull(options);
+
+ //TODO (conniey): Keep track of the last sequence number as each method invoked.
+ this.consumer.receive().windowTimeout(options.prefetchCount(), this.options.retry().tryTimeout());
+ }
+
/**
* Receives a batch of EventData from the Event Hub partition.
*
@@ -54,5 +66,6 @@ IterableResponse receive(int maximumMessageCount, Duration maximumWai
*/
@Override
public void close() throws IOException {
+ consumer.close();
}
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java
index d9fd13357b922..f176ebb2825e9 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubAsyncProducerJavaDocCodeSamples.java
@@ -101,14 +101,14 @@ public void sendEventDataBatch() {
// The sample Flux contains three events, but it could be an infinite stream of telemetry events.
telemetryEvents.subscribe(event -> {
- final EventDataBatch batch = currentBatch.get();
- if (!batch.tryAdd(event)) {
- producer.createBatch(options).map(newBatch -> {
- currentBatch.set(newBatch);
- return producer.send(batch);
- }).block();
- }
- }, error -> System.err.println("Error received:" + error),
+ final EventDataBatch batch = currentBatch.get();
+ if (!batch.tryAdd(event)) {
+ producer.createBatch(options).map(newBatch -> {
+ currentBatch.set(newBatch);
+ return producer.send(batch);
+ }).block();
+ }
+ }, error -> System.err.println("Error received:" + error),
() -> {
final EventDataBatch batch = currentBatch.getAndSet(null);
if (batch != null) {
From e4de89d2dda29deb597d7fbdf1778692da58b9e7 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 11:01:16 -0700
Subject: [PATCH 14/32] Update from Iterable to IterableResponse.
---
.../java/com/azure/messaging/eventhubs/EventHubClient.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index 4f42a6135e5f0..016b8d585b2de 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -4,6 +4,7 @@
package com.azure.messaging.eventhubs;
import com.azure.core.amqp.RetryOptions;
+import com.azure.core.http.rest.IterableResponse;
import com.azure.core.implementation.annotation.ReturnType;
import com.azure.core.implementation.annotation.ServiceClient;
import com.azure.core.implementation.annotation.ServiceMethod;
@@ -62,8 +63,8 @@ public EventHubProperties getProperties() {
* @return The identifiers for all partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
- public Iterable getPartitionIds() {
- return client.getPartitionIds().collectList().block(retry.tryTimeout());
+ public IterableResponse getPartitionIds() {
+ return new IterableResponse<>(client.getPartitionIds());
}
/**
From 1f48bf7d3b8b92eea4d5c262fb0b35589f88301a Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 12:41:55 -0700
Subject: [PATCH 15/32] Make test contents package-private.
---
.../eventhubs/EventHubAsyncProducerTest.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerTest.java
index eaf3bc89e17a8..76892d08ac551 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerTest.java
@@ -68,7 +68,7 @@ public void teardown() {
public void sendMultipleMessages() {
// Arrange
final int count = 4;
- final byte[] contents = CONTENTS.getBytes(UTF_8);
+ final byte[] contents = TEST_CONTENTS.getBytes(UTF_8);
final Flux testData = Flux.range(0, count).flatMap(number -> {
final EventData data = new EventData(contents);
return Flux.just(data);
@@ -100,7 +100,7 @@ public void sendMultipleMessages() {
@Test
public void sendSingleMessage() {
// Arrange
- final EventData testData = new EventData(CONTENTS.getBytes(UTF_8));
+ final EventData testData = new EventData(TEST_CONTENTS.getBytes(UTF_8));
when(sendLink.send(any(Message.class))).thenReturn(Mono.empty());
@@ -128,8 +128,8 @@ public void sendSingleMessage() {
public void partitionProducerCannotSendWithPartitionKey() {
// Arrange
final Flux testData = Flux.just(
- new EventData(CONTENTS.getBytes(UTF_8)),
- new EventData(CONTENTS.getBytes(UTF_8)));
+ new EventData(TEST_CONTENTS.getBytes(UTF_8)),
+ new EventData(TEST_CONTENTS.getBytes(UTF_8)));
when(sendLink.send(anyList())).thenReturn(Mono.empty());
@@ -163,7 +163,7 @@ public void sendTooManyMessages() {
// We believe 20 events is enough for that EventDataBatch to be greater than max size.
final Flux testData = Flux.range(0, 20).flatMap(number -> {
- final EventData data = new EventData(CONTENTS.getBytes(UTF_8));
+ final EventData data = new EventData(TEST_CONTENTS.getBytes(UTF_8));
return Flux.just(data);
});
@@ -378,7 +378,7 @@ public void sendsAnEventDataBatch() {
verify(link, times(2)).getLinkSize();
}
- private static final String CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \n"
+ static final String TEST_CONTENTS = "SSLorem ipsum dolor sit amet, consectetur adipiscing elit. Donec vehicula posuere lobortis. Aliquam finibus volutpat dolor, faucibus pellentesque ipsum bibendum vitae. Class aptent taciti sociosqu ad litora torquent per conubia nostra, per inceptos himenaeos. Ut sit amet urna hendrerit, dapibus justo a, sodales justo. Mauris finibus augue id pulvinar congue. Nam maximus luctus ipsum, at commodo ligula euismod ac. Phasellus vitae lacus sit amet diam porta placerat. \n"
+ "Ut sodales efficitur sapien ut posuere. Morbi sed tellus est. Proin eu erat purus. Proin massa nunc, condimentum id iaculis dignissim, consectetur et odio. Cras suscipit sem eu libero aliquam tincidunt. Nullam ut arcu suscipit, eleifend velit in, cursus libero. Ut eleifend facilisis odio sit amet feugiat. Phasellus at nunc sit amet elit sagittis commodo ac in nisi. Fusce vitae aliquam quam. Integer vel nibh euismod, tempus elit vitae, pharetra est. Duis vulputate enim a elementum dignissim. Morbi dictum enim id elit scelerisque, in elementum nulla pharetra. \n"
+ "Aenean aliquet aliquet condimentum. Proin dapibus dui id libero tempus feugiat. Sed commodo ligula a lectus mattis, vitae tincidunt velit auctor. Fusce quis semper dui. Phasellus eu efficitur sem. Ut non sem sit amet enim condimentum venenatis id dictum massa. Nullam sagittis lacus a neque sodales, et ultrices arcu mattis. Aliquam erat volutpat. \n"
+ "Aenean fringilla quam elit, id mattis purus vestibulum nec. Praesent porta eros in dapibus molestie. Vestibulum orci libero, tincidunt et turpis eget, condimentum lobortis enim. Fusce suscipit ante et mauris consequat cursus nec laoreet lorem. Maecenas in sollicitudin diam, non tincidunt purus. Nunc mauris purus, laoreet eget interdum vitae, placerat a sapien. In mi risus, blandit eu facilisis nec, molestie suscipit leo. Pellentesque molestie urna vitae dui faucibus bibendum. \n"
From 49c2aaa8c8b66da4e8cc8ef25d6b69e54490665c Mon Sep 17 00:00:00 2001
From: Connie Yau
Date: Tue, 13 Aug 2019 15:02:35 -0700
Subject: [PATCH 16/32] Update
sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
Co-Authored-By: Srikanta <51379715+srnagar@users.noreply.github.com>
---
.../com/azure/messaging/eventhubs/EventHubClientBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index e40c5027ba93c..5a783ca0259de 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -285,7 +285,7 @@ public EventHubAsyncClient buildAsyncClient() {
}
/**
- * Creates a new {@link EventHubClient} based on options set on this builder. Every time {@code buildAsyncClient()}
+ * Creates a new {@link EventHubClient} based on options set on this builder. Every time {@code buildClient()}
* is invoked, a new instance of {@link EventHubAsyncClient} is created.
*
*
* A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that
- * only one consumer from that group is reading the from the partition. These exclusive consumers are sometimes
+ * only one consumer from that group is reading from the partition. These exclusive consumers are sometimes
* referred to as "Epoch Consumers."
*
* A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively
From 0d40de4182dbd58c9e5c3689a7a5314ec994be74 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:05:46 -0700
Subject: [PATCH 18/32] Fix typo in connection strings.
---
.../com/azure/messaging/eventhubs/EventHubClientBuilder.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index 5a783ca0259de..cdd80c591111c 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -286,7 +286,7 @@ public EventHubAsyncClient buildAsyncClient() {
/**
* Creates a new {@link EventHubClient} based on options set on this builder. Every time {@code buildClient()}
- * is invoked, a new instance of {@link EventHubAsyncClient} is created.
+ * is invoked, a new instance of {@link EventHubClient} is created.
*
*
* The following options are used if ones are not specified in the builder:
From bbb1eb528e1571174b1c77786d759a731840aba2 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:07:42 -0700
Subject: [PATCH 19/32] Fix javadoc errors.
---
.../main/java/com/azure/messaging/eventhubs/EventHubClient.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index da17f2fbf1f12..8b9b4a39f1589 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -21,7 +21,7 @@
* Event Hub.
*
*
- * Creating a synchronous {@link EventHubClient} using Event Hub instance connection string
+ * Creating a synchronous {@link EventHubClient} using an Event Hub instance connection string
*
*
* {@codesnippet com.azure.messaging.eventhubs.eventhubclient.instantiation}
From d4c89b21e306c1cc5fdde0ce0ea14f3c7841b38f Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:10:52 -0700
Subject: [PATCH 20/32] Add Null checks for options.
---
.../messaging/eventhubs/EventHubClient.java | 38 +++++++++++--------
1 file changed, 23 insertions(+), 15 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index 8b9b4a39f1589..625b6865deabb 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -14,6 +14,7 @@
import com.azure.messaging.eventhubs.models.EventPosition;
import java.io.Closeable;
+import java.util.Objects;
/**
* The main point of interaction with Azure Event Hubs, the client offers a connection to a specific Event Hub within
@@ -38,13 +39,15 @@ public class EventHubClient implements Closeable {
private final EventHubConsumerOptions defaultConsumerOptions;
EventHubClient(EventHubAsyncClient client, ConnectionOptions connectionOptions) {
+ Objects.requireNonNull(connectionOptions);
+
+ this.client = Objects.requireNonNull(client);
this.retry = connectionOptions.retry();
this.defaultProducerOptions = new EventHubProducerOptions()
.retry(connectionOptions.retry());
this.defaultConsumerOptions = new EventHubConsumerOptions()
.retry(connectionOptions.retry())
.scheduler(connectionOptions.scheduler());
- this.client = client;
}
/**
@@ -97,9 +100,14 @@ public EventHubProducer createProducer() {
*
* @param options The set of options to apply when creating the producer.
* @return A new {@link EventHubProducer}.
- * @throws NullPointerException if {@code options} is {@code null}.
+ * @throws NullPointerException if {@code options}, {@code options.retry()}, or {@code
+ * options.retry().retryTimeout()} is {@code null}.
*/
public EventHubProducer createProducer(EventHubProducerOptions options) {
+ Objects.requireNonNull(options);
+ Objects.requireNonNull(options.retry(), "'options.retry()' cannot be null.");
+ Objects.requireNonNull(options.retry().tryTimeout(), "'options.retry().tryTimeout()' cannot be null.");
+
final EventHubAsyncProducer producer = client.createProducer();
return new EventHubProducer(producer, options.retry().tryTimeout());
@@ -113,15 +121,15 @@ public EventHubProducer createProducer(EventHubProducerOptions options) {
* reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
* Consumers".
*
- * @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in
- * the context of this group. The name of the consumer group that is created by default is {@link
- * EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
+ * @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the
+ * context of this group. The name of the consumer group that is created by default is {@link
+ * EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
* @param partitionId The identifier of the Event Hub partition.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @return A new {@link EventHubConsumer} that receives events from the partition at the given position.
* @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
- * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
- * empty string.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an empty
+ * string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition);
@@ -134,8 +142,8 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId,
*
*
* A consumer may be exclusive, which asserts ownership over the partition for the consumer group to ensure that
- * only one consumer from that group is reading from the partition. These exclusive consumers are sometimes
- * referred to as "Epoch Consumers."
+ * only one consumer from that group is reading from the partition. These exclusive consumers are sometimes referred
+ * to as "Epoch Consumers."
*
* A consumer may also be non-exclusive, allowing multiple consumers from the same consumer group to be actively
* reading events from the partition. These non-exclusive consumers are sometimes referred to as "Non-epoch
@@ -146,17 +154,17 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId,
* non-exclusive.
*
*
- * @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in
- * the context of this group. The name of the consumer group that is created by default is {@link
- * EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
+ * @param consumerGroup The name of the consumer group this consumer is associated with. Events are read in the
+ * context of this group. The name of the consumer group that is created by default is {@link
+ * EventHubAsyncClient#DEFAULT_CONSUMER_GROUP_NAME "$Default"}.
* @param partitionId The identifier of the Event Hub partition from which events will be received.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubConsumer} that receives events from the partition with all configured {@link
- * EventHubConsumerOptions}.
+ * EventHubConsumerOptions}.
* @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
- * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
- * empty string.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an empty
+ * string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
From 104fe4b262babdd3e3c55a7cfed5f9399347ae63 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:12:28 -0700
Subject: [PATCH 21/32] Fix null checks.
---
.../messaging/eventhubs/EventHubAsyncClient.java | 12 +++++++-----
.../azure/messaging/eventhubs/EventHubClient.java | 6 +++---
2 files changed, 10 insertions(+), 8 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
index f0457d565f761..c20d9fe2c14b6 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
@@ -236,20 +236,22 @@ public EventHubAsyncConsumer createConsumer(String consumerGroup, String partiti
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubAsyncConsumer} that receives events from the partition with all configured {@link
* EventHubConsumerOptions}.
- * @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
- * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an
- * empty string.
+ * @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
+ * options} is {@code null}.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubAsyncConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
Objects.requireNonNull(eventPosition);
Objects.requireNonNull(options);
+ Objects.requireNonNull(consumerGroup);
+ Objects.requireNonNull(partitionId);
if (ImplUtils.isNullOrEmpty(consumerGroup)) {
- throw new IllegalArgumentException("'consumerGroup' cannot be null or empty.");
+ throw new IllegalArgumentException("'consumerGroup' cannot be an empty string.");
}
if (ImplUtils.isNullOrEmpty(partitionId)) {
- throw new IllegalArgumentException("'partitionId' cannot be null or empty.");
+ throw new IllegalArgumentException("'partitionId' cannot be an empty string.");
}
final EventHubConsumerOptions clonedOptions = options.clone();
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index 625b6865deabb..a1099c6a86b82 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -127,9 +127,9 @@ public EventHubProducer createProducer(EventHubProducerOptions options) {
* @param partitionId The identifier of the Event Hub partition.
* @param eventPosition The position within the partition where the consumer should begin reading events.
* @return A new {@link EventHubConsumer} that receives events from the partition at the given position.
- * @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
- * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an empty
- * string.
+ * @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
+ * options} is {@code null}.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition) {
final EventHubAsyncConsumer consumer = client.createConsumer(consumerGroup, partitionId, eventPosition);
From d49d7c8d339a1d1602f3038f34028a12db5e71d5 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:17:51 -0700
Subject: [PATCH 22/32] Remove unneeded log message.
---
.../java/com/azure/messaging/eventhubs/EventHubAsyncClient.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
index c20d9fe2c14b6..a3a6c04861ecc 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java
@@ -270,8 +270,6 @@ public EventHubAsyncConsumer createConsumer(String consumerGroup, String partiti
return connection.createSession(entityPath).cast(EventHubSession.class);
}).flatMap(session -> {
logger.verbose("Creating consumer for path: {}", entityPath);
-
- logger.verbose("Creating producer for {}", entityPath);
final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(clonedOptions.retry());
return session.createConsumer(linkName, entityPath, getExpression(eventPosition),
From e942750f2e778445a2cc32b05552792651b8877a Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:37:32 -0700
Subject: [PATCH 23/32] Adding tests for EventHubProducer.
---
.../eventhubs/EventHubProducerTest.java | 215 ++++++++++++++++++
1 file changed, 215 insertions(+)
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java
new file mode 100644
index 0000000000000..db2fae439f3b4
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java
@@ -0,0 +1,215 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.RetryOptions;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.amqp.exception.ErrorCondition;
+import com.azure.core.amqp.exception.ErrorContext;
+import com.azure.messaging.eventhubs.implementation.AmqpSendLink;
+import com.azure.messaging.eventhubs.models.BatchOptions;
+import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.message.Message;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests to verify functionality of {@link EventHubProducer}.
+ */
+public class EventHubProducerTest {
+ @Mock
+ private AmqpSendLink sendLink;
+ @Captor
+ private ArgumentCaptor singleMessageCaptor;
+ @Captor
+ private ArgumentCaptor> messagesCaptor;
+
+ private EventHubAsyncProducer asyncProducer;
+ private RetryOptions retryOptions = new RetryOptions().tryTimeout(Duration.ofSeconds(30));
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ when(sendLink.getLinkSize()).thenReturn(Mono.just(EventHubAsyncProducer.MAX_MESSAGE_LENGTH_BYTES));
+ when(sendLink.getErrorContext()).thenReturn(new ErrorContext("test-namespace"));
+ when(sendLink.send(anyList())).thenReturn(Mono.empty());
+ when(sendLink.send(any(Message.class))).thenReturn(Mono.empty());
+
+ asyncProducer = new EventHubAsyncProducer(
+ Mono.fromCallable(() -> sendLink),
+ new EventHubProducerOptions().retry(retryOptions));
+ }
+
+ @After
+ public void teardown() {
+ Mockito.framework().clearInlineMocks();
+ sendLink = null;
+ singleMessageCaptor = null;
+ messagesCaptor = null;
+ }
+
+ /**
+ * Verifies can send a single message.
+ */
+ @Test
+ public void sendSingleMessage() {
+ // Arrange
+ final EventHubProducer producer = new EventHubProducer(asyncProducer, retryOptions.tryTimeout());
+ final EventData eventData = new EventData("hello-world".getBytes(UTF_8));
+
+ // Act
+ producer.send(eventData);
+
+ // Assert
+ verify(sendLink, times(1)).send(any(Message.class));
+ verify(sendLink).send(singleMessageCaptor.capture());
+
+ final Message message = singleMessageCaptor.getValue();
+ Assert.assertEquals(Section.SectionType.Data, message.getBody().getType());
+ }
+
+ /**
+ * Verifies we can send multiple messages.
+ */
+ @Test
+ public void sendMultipleMessages() {
+ // Arrange
+ final int count = 4;
+ final Iterable events = Flux.range(0, count).map(number -> {
+ final String contents = "event-data-" + number;
+ return new EventData(contents.getBytes(UTF_8));
+ }).toIterable();
+
+ final SendOptions options = new SendOptions();
+ final EventHubProducer producer = new EventHubProducer(asyncProducer, retryOptions.tryTimeout());
+
+ // Act
+ producer.send(events, options);
+
+ // Assert
+ verify(sendLink).send(messagesCaptor.capture());
+
+ final List messagesSent = messagesCaptor.getValue();
+ Assert.assertEquals(count, messagesSent.size());
+
+ messagesSent.forEach(message -> Assert.assertEquals(Section.SectionType.Data, message.getBody().getType()));
+ }
+
+ /**
+ * Verifies that the producer can create an {@link EventDataBatch} with the size given by the underlying AMQP send
+ * link.
+ */
+ @Test
+ public void createsEventDataBatch() {
+ // Arrange
+ int maxLinkSize = 1024;
+
+ // Overhead when serializing an event, to figure out what the maximum size we can use for an event payload.
+ int eventOverhead = 24;
+ int maxEventPayload = maxLinkSize - eventOverhead;
+
+ final AmqpSendLink link = mock(AmqpSendLink.class);
+ when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));
+
+ // This event is 1024 bytes when serialized.
+ final EventData event = new EventData(new byte[maxEventPayload]);
+
+ // This event will be 1025 bytes when serialized.
+ final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]);
+
+ final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(retryOptions);
+ final EventHubAsyncProducer hubAsyncProducer = new EventHubAsyncProducer(Mono.fromCallable(() -> link), producerOptions);
+ final EventHubProducer hubProducer = new EventHubProducer(hubAsyncProducer, retryOptions.tryTimeout());
+
+ // Act
+ final EventDataBatch batch = hubProducer.createBatch();
+
+ // Assert
+ Assert.assertNull(batch.getPartitionKey());
+ Assert.assertFalse(batch.tryAdd(tooLargeEvent));
+ Assert.assertTrue(batch.tryAdd(event));
+
+ verify(link, times(1)).getLinkSize();
+ }
+
+ /**
+ * Verifies we can create an EventDataBatch with partition key and link size.
+ */
+ @Test
+ public void createsEventDataBatchWithPartitionKey() {
+ // Arrange
+ int maxBatchSize = 1024;
+
+ // Overhead when serializing an event, to figure out what the maximum size we can use for an event payload.
+ int eventOverhead = 98;
+ int maxEventPayload = maxBatchSize - eventOverhead;
+
+ // This event is 1024 bytes when serialized.
+ final EventData event = new EventData(new byte[maxEventPayload]);
+
+ // No idea what the overhead for adding partition key is. But we know this will be smaller than the max size.
+ final BatchOptions options = new BatchOptions()
+ .partitionKey("some-key")
+ .maximumSizeInBytes(maxBatchSize);
+ final EventHubProducer producer = new EventHubProducer(asyncProducer, retryOptions.tryTimeout());
+
+ // Act
+ final EventDataBatch batch = producer.createBatch(options);
+
+ // Arrange
+ Assert.assertEquals(options.partitionKey(), batch.getPartitionKey());
+ Assert.assertTrue(batch.tryAdd(event));
+ }
+
+ /**
+ * Verifies we can create an EventDataBatch with partition key and link size.
+ */
+ @Test
+ public void payloadTooLarge() {
+ // Arrange
+ int maxBatchSize = 1024;
+
+ // Overhead when serializing an event, to figure out what the maximum size we can use for an event payload.
+ int eventOverhead = 24;
+ int maxEventPayload = maxBatchSize - eventOverhead;
+
+ // This event is 1025 bytes when serialized.
+ final EventData event = new EventData(new byte[maxEventPayload + 1]);
+
+ // No idea what the overhead for adding partition key is. But we know this will be smaller than the max size.
+ final BatchOptions options = new BatchOptions()
+ .maximumSizeInBytes(maxBatchSize);
+ final EventHubProducer producer = new EventHubProducer(asyncProducer, retryOptions.tryTimeout());
+ final EventDataBatch batch = producer.createBatch(options);
+
+ // Act & Assert
+ try {
+ batch.tryAdd(event);
+ } catch (AmqpException e) {
+ Assert.assertEquals(ErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, e.getErrorCondition());
+ }
+ }
+}
From 5f51b4ebd618d046397e0a4f500fe5dca5d672b6 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:47:50 -0700
Subject: [PATCH 24/32] Simplifying creation of EventHubAsyncProducer
---
.../EventHubAsyncProducerIntegrationTest.java | 25 +++++--------------
1 file changed, 6 insertions(+), 19 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerIntegrationTest.java
index 11b676fdffa5e..1741a14c08c76 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerIntegrationTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncProducerIntegrationTest.java
@@ -3,15 +3,10 @@
package com.azure.messaging.eventhubs;
-import com.azure.core.amqp.TransportType;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.ApiTestBase;
-import com.azure.messaging.eventhubs.implementation.ConnectionOptions;
-import com.azure.messaging.eventhubs.implementation.ConnectionStringProperties;
-import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider;
import com.azure.messaging.eventhubs.models.BatchOptions;
import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
-import com.azure.messaging.eventhubs.models.ProxyConfiguration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -45,13 +40,13 @@ protected String testName() {
@Override
protected void beforeTest() {
- final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider());
- final ConnectionStringProperties properties = new ConnectionStringProperties(getConnectionString());
- final ConnectionOptions connectionOptions = new ConnectionOptions(properties.endpoint().getHost(),
- properties.eventHubName(), getTokenCredential(), getAuthorizationType(), TransportType.AMQP, RETRY_OPTIONS,
- ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.parallel());
+ skipIfNotRecordMode();
- client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider);
+ client = new EventHubClientBuilder()
+ .connectionString(getConnectionString())
+ .retry(RETRY_OPTIONS)
+ .scheduler(Schedulers.parallel())
+ .buildAsyncClient();
}
@Override
@@ -64,8 +59,6 @@ protected void afterTest() {
*/
@Test
public void sendMessageToPartition() throws IOException {
- skipIfNotRecordMode();
-
// Arrange
final EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(PARTITION_ID);
final List events = Arrays.asList(
@@ -86,8 +79,6 @@ public void sendMessageToPartition() throws IOException {
*/
@Test
public void sendMessage() throws IOException {
- skipIfNotRecordMode();
-
// Arrange
final List events = Arrays.asList(
new EventData("Event 1".getBytes(UTF_8)),
@@ -106,8 +97,6 @@ public void sendMessage() throws IOException {
*/
@Test
public void sendBatch() throws IOException {
- skipIfNotRecordMode();
-
// Arrange
final List events = Arrays.asList(
new EventData("Event 1".getBytes(UTF_8)),
@@ -134,8 +123,6 @@ public void sendBatch() throws IOException {
*/
@Test
public void sendBatchWithPartitionKey() throws IOException {
- skipIfNotRecordMode();
-
// Arrange
final List events = Arrays.asList(
new EventData("Event 1".getBytes(UTF_8)),
From 1018908bb529c8837415634735be8931b4102846 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 15:53:05 -0700
Subject: [PATCH 25/32] Remove redundant null check.
---
.../com/azure/messaging/eventhubs/EventHubClient.java | 9 ++-------
1 file changed, 2 insertions(+), 7 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index a1099c6a86b82..524284975497d 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -100,15 +100,10 @@ public EventHubProducer createProducer() {
*
* @param options The set of options to apply when creating the producer.
* @return A new {@link EventHubProducer}.
- * @throws NullPointerException if {@code options}, {@code options.retry()}, or {@code
- * options.retry().retryTimeout()} is {@code null}.
+ * @throws NullPointerException if {@code options} is {@code null}.
*/
public EventHubProducer createProducer(EventHubProducerOptions options) {
- Objects.requireNonNull(options);
- Objects.requireNonNull(options.retry(), "'options.retry()' cannot be null.");
- Objects.requireNonNull(options.retry().tryTimeout(), "'options.retry().tryTimeout()' cannot be null.");
-
- final EventHubAsyncProducer producer = client.createProducer();
+ final EventHubAsyncProducer producer = client.createProducer(options);
return new EventHubProducer(producer, options.retry().tryTimeout());
}
From 470f4efba0d96dc7b395ddbbf55f71508f17d5d3 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 16:17:37 -0700
Subject: [PATCH 26/32] Select correct retryDuration when constructing
EventHubProducer.
---
.../azure/messaging/eventhubs/EventHubClient.java | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
index 524284975497d..0311132951878 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClient.java
@@ -14,6 +14,7 @@
import com.azure.messaging.eventhubs.models.EventPosition;
import java.io.Closeable;
+import java.time.Duration;
import java.util.Objects;
/**
@@ -103,9 +104,15 @@ public EventHubProducer createProducer() {
* @throws NullPointerException if {@code options} is {@code null}.
*/
public EventHubProducer createProducer(EventHubProducerOptions options) {
+ Objects.requireNonNull(options);
+
final EventHubAsyncProducer producer = client.createProducer(options);
- return new EventHubProducer(producer, options.retry().tryTimeout());
+ final Duration tryTimeout = options.retry() != null && options.retry().tryTimeout() != null
+ ? options.retry().tryTimeout()
+ : defaultProducerOptions.retry().tryTimeout();
+
+ return new EventHubProducer(producer, tryTimeout);
}
/**
@@ -157,9 +164,9 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId,
* @param options The set of options to apply when creating the consumer.
* @return An new {@link EventHubConsumer} that receives events from the partition with all configured {@link
* EventHubConsumerOptions}.
- * @throws NullPointerException If {@code eventPosition}, or {@code options} is {@code null}.
- * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is {@code null} or an empty
- * string.
+ * @throws NullPointerException If {@code eventPosition}, {@code consumerGroup}, {@code partitionId}, or {@code
+ * options} is {@code null}.
+ * @throws IllegalArgumentException If {@code consumerGroup} or {@code partitionId} is an empty string.
*/
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition,
EventHubConsumerOptions options) {
From c648c2e4b923bf9c36a337721574cee87458c428 Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 16:17:53 -0700
Subject: [PATCH 27/32] Adding EventHubProducer tests.
---
.../EventHubProducerIntegrationTest.java | 132 ++++++++++++++++++
1 file changed, 132 insertions(+)
create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java
new file mode 100644
index 0000000000000..5c3b5c008912f
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java
@@ -0,0 +1,132 @@
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.implementation.ApiTestBase;
+import com.azure.messaging.eventhubs.models.BatchOptions;
+import com.azure.messaging.eventhubs.models.EventHubProducerOptions;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import reactor.core.scheduler.Schedulers;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+public class EventHubProducerIntegrationTest extends ApiTestBase {
+ private static final String PARTITION_ID = "1";
+ private EventHubClient client;
+
+ public EventHubProducerIntegrationTest() {
+ super(new ClientLogger(EventHubProducerIntegrationTest.class));
+ }
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Override
+ protected String testName() {
+ return testName.getMethodName();
+ }
+
+ @Override
+ protected void beforeTest() {
+ skipIfNotRecordMode();
+
+ client = new EventHubClientBuilder()
+ .connectionString(getConnectionString())
+ .retry(RETRY_OPTIONS)
+ .scheduler(Schedulers.parallel())
+ .buildClient();
+ }
+
+ @Override
+ protected void afterTest() {
+ dispose(client);
+ }
+
+ /**
+ * Verifies that we can create and send a message to an Event Hub partition.
+ */
+ @Test
+ public void sendMessageToPartition() throws IOException {
+ // Arrange
+ final EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(PARTITION_ID);
+ final List events = Arrays.asList(
+ new EventData("Event 1".getBytes(UTF_8)),
+ new EventData("Event 2".getBytes(UTF_8)),
+ new EventData("Event 3".getBytes(UTF_8)));
+
+ // Act & Assert
+ try (EventHubProducer producer = client.createProducer(producerOptions)) {
+ producer.send(events);
+ }
+ }
+
+ /**
+ * Verifies that we can create an {@link EventHubProducer} that does not care about partitions and lets the service
+ * distribute the events.
+ */
+ @Test
+ public void sendMessage() throws IOException {
+ // Arrange
+ final List events = Arrays.asList(
+ new EventData("Event 1".getBytes(UTF_8)),
+ new EventData("Event 2".getBytes(UTF_8)),
+ new EventData("Event 3".getBytes(UTF_8)));
+
+ // Act & Assert
+ try (EventHubProducer producer = client.createProducer()) {
+ producer.send(events);
+ }
+ }
+
+ /**
+ * Verifies we can create an {@link EventDataBatch} and send it using our EventHubProducer.
+ */
+ @Test
+ public void sendBatch() throws IOException {
+ // Arrange
+ final List events = Arrays.asList(
+ new EventData("Event 1".getBytes(UTF_8)),
+ new EventData("Event 2".getBytes(UTF_8)),
+ new EventData("Event 3".getBytes(UTF_8)));
+
+ // Act & Assert
+ try (EventHubProducer producer = client.createProducer()) {
+ EventDataBatch batch = producer.createBatch();
+ events.forEach(event -> {
+ Assert.assertTrue(batch.tryAdd(event));
+ });
+
+ producer.send(batch);
+ }
+ }
+
+ /**
+ * Verifies we can create an {@link EventDataBatch} with a partition key and send it using our EventHubProducer.
+ */
+ @Test
+ public void sendBatchWithPartitionKey() throws IOException {
+ // Arrange
+ final List events = Arrays.asList(
+ new EventData("Event 1".getBytes(UTF_8)),
+ new EventData("Event 2".getBytes(UTF_8)),
+ new EventData("Event 3".getBytes(UTF_8)));
+
+ // Act & Assert
+ try (EventHubProducer producer = client.createProducer()) {
+ final BatchOptions options = new BatchOptions().partitionKey("my-partition-key");
+ final EventDataBatch batch = producer.createBatch(options);
+
+ events.forEach(event -> {
+ Assert.assertTrue(batch.tryAdd(event));
+ });
+
+ producer.send(batch);
+ }
+ }
+}
From cfb1d11595b2f0f13bfb2d91769ce30ed14a1e8e Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 16:20:05 -0700
Subject: [PATCH 28/32] Add correct header file.
---
.../messaging/eventhubs/EventHubProducerIntegrationTest.java | 3 +++
1 file changed, 3 insertions(+)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java
index 5c3b5c008912f..524d7981611e6 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java
@@ -1,3 +1,6 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
package com.azure.messaging.eventhubs;
import com.azure.core.util.logging.ClientLogger;
From f7cf4e4d85fc5d000fb421c48092461c735f0e9e Mon Sep 17 00:00:00 2001
From: Connie
Date: Tue, 13 Aug 2019 16:22:56 -0700
Subject: [PATCH 29/32] Rename EventHubClientIntegrationTest ->
EventHubAsyncClientIntegrationTests
---
...ionTest.java => EventHubAsyncClientIntegrationTest.java} | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
rename sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/{EventHubClientIntegrationTest.java => EventHubAsyncClientIntegrationTest.java} (97%)
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java
similarity index 97%
rename from sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java
rename to sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java
index 83eb91c703269..c6f2c028eeeaa 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubAsyncClientIntegrationTest.java
@@ -41,7 +41,7 @@
* Tests scenarios on {@link EventHubAsyncClient}.
*/
@RunWith(Parameterized.class)
-public class EventHubClientIntegrationTest extends ApiTestBase {
+public class EventHubAsyncClientIntegrationTest extends ApiTestBase {
private static final int NUMBER_OF_EVENTS = 5;
@Parameterized.Parameters(name = "{index}: transportType={0}")
@@ -59,8 +59,8 @@ public static Iterable