Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update samples for new Event Hubs core classes #6546

Merged
merged 17 commits into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 115 additions & 104 deletions sdk/eventhubs/azure-messaging-eventhubs/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import static com.azure.core.util.FluxUtil.monoError;

/**
* A consumer responsible for reading {@link EventData} from a specific Event Hub partition in the context of a specific
* consumer group.
* An <b>asynchronous</b> consumer responsible for reading {@link EventData} from either a specific Event Hub partition
* or all partitions in the context of a specific consumer group.
*
* <p><strong>Creating an {@link EventHubConsumerAsyncClient}</strong></p>
* <p>Required parameters are {@code consumerGroup}, and credentials are required when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* A consumer responsible for reading {@link EventData} from either a specific Event Hub partition or all partitions in
* the context of a consumer group.
* A <b>synchronous</b> consumer responsible for reading {@link EventData} from an Event Hub partition in the context of
* a specific consumer group.
*
* <p><strong>Creating a synchronous consumer</strong></p>
* <p>Required parameters are {@code consumerGroup} and credentials when creating a consumer.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
import static com.azure.messaging.eventhubs.implementation.ClientConstants.MAX_MESSAGE_LENGTH_BYTES;

/**
* A producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped together in batches.
* Depending on the options specified at creation, the producer may be created to allow event data to be automatically
* routed to an available partition or specific to a partition.
* An <b>asynchronous</b> producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped
* together in batches. Depending on the options specified at creation, the producer may be created to allow event data
* to be automatically routed to an available partition or specific to a partition.
*
* <p>
* Allowing automatic routing of partitions is recommended when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import java.util.Objects;

/**
* A producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped together in batches.
* Depending on the options specified at creation, the producer may be created to allow event data to be automatically
* routed to an available partition or specific to a partition.
* A <b>synchronous</b> producer responsible for transmitting {@link EventData} to a specific Event Hub, grouped
* together in batches. Depending on the options specified at creation, the producer may be created to allow event data
* to be automatically routed to an available partition or specific to a partition.
*
* <p>
* Allowing automatic routing of partitions is recommended when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public final class EventHubProperties {
}

/**
* Gets the Event Hub name
* Gets the name of the Event Hub.
*
* @return Name of the Event Hub.
*/
Expand All @@ -50,7 +50,7 @@ public String getName() {
}

/**
* Gets the instant, in UTC, at which Event Hub was created at.
* Gets the instant, in UTC, at which Event Hub was created.
*
* @return The instant, in UTC, at which the Event Hub was created.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* Sample demonstrates how to receive events from an Azure Event Hub instance.
*/
public class ConsumeEvent {
public class ConsumeEvents {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static final int NUMBER_OF_EVENTS = 10;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,27 @@

import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.SendOptions;
import reactor.core.Disposable;

import java.time.Duration;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Sample demonstrates how to receive events starting from the specific sequence number position in an Event Hub instance.
* Sample demonstrates how to receive events starting from the specific sequence number position in an Event Hub
* instance. It also demonstrates how to publish events to a specific partition.
*/
public class ConsumeEventsFromKnownSequenceNumberPosition {
private static final Duration OPERATION_TIMEOUT = Duration.ofSeconds(30);
private static long lastEnqueuedSequenceNumber = -1;
private static String lastEnqueuedSequencePartitionId = null;

/**
* Main method to invoke this demo about how to receive event from a known sequence number position in an Azure Event Hub instance.
* Main method to invoke this demo about how to receive event from a known sequence number position in an Azure
* Event Hub instance.
*
* @param args Unused arguments to the program.
* @throws InterruptedException The countdown latch was interrupted while waiting for this sample to
* complete.
*/
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) {
final AtomicBoolean isRunning = new AtomicBoolean(true);

// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
Expand All @@ -36,76 +33,59 @@ public static void main(String[] args) throws InterruptedException {
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";

EventHubClientBuilder builder = new EventHubClientBuilder()
.connectionString(connectionString)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME);

EventHubConsumerAsyncClient earliestConsumer = builder.buildAsyncConsumerClient();

earliestConsumer.getPartitionIds().flatMap(partitionId -> earliestConsumer.getPartitionProperties(partitionId))
.subscribe(
properties -> {
if (!properties.isEmpty()) {
lastEnqueuedSequenceNumber = properties.getLastEnqueuedSequenceNumber();
lastEnqueuedSequencePartitionId = properties.getId();
}
},
error -> System.err.println("Error occurred while fetching partition properties: " + error.toString()),
() -> {
// Releasing the semaphore now that we've finished querying for partition properties.
semaphore.release();
});

System.out.println("Waiting for partition properties to complete...");
// Acquiring the semaphore so that this sample does not end before all the partition properties are fetched.
semaphore.acquire();
System.out.printf("Last enqueued sequence number: %s%n", lastEnqueuedSequenceNumber);
final EventHubClientBuilder builder = new EventHubClientBuilder()
.connectionString(connectionString);

// The consumer group is required for consuming events.
final EventHubConsumerAsyncClient consumer = builder
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();

// Find the first non-empty partition we can start consuming from.
// Block on it because we don't know what partition to start reading from, yet.
final PartitionProperties nonEmptyPartition = consumer.getPartitionIds()
.flatMap(partitionId -> consumer.getPartitionProperties(partitionId))
.filter(properties -> !properties.isEmpty())
.blockFirst(OPERATION_TIMEOUT);

// Make sure to have at least one non-empty event hub in order to continue the sample execution
// if you don't have an non-empty event hub, try with another example 'SendEvent' in the same directory.
if (lastEnqueuedSequenceNumber == -1 || lastEnqueuedSequencePartitionId == null) {
System.err.println("All event hubs are empty");
if (nonEmptyPartition == null) {
System.err.println("All event hub partitions are empty");
System.exit(0);
}

// Create a consumer.
// The "$Default" consumer group is created by default. This value can be found by going to the Event Hub
// instance you are connecting to, and selecting the "Consumer groups" page. EventPosition.latest() tells the
// service we only want events that are sent to the partition after we begin listening.
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString(connectionString)
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
// ex. The last enqueued sequence number is 99. If isInclusive is true, the received event starting from
// the same event with sequence number of '99'. Otherwise, the event with sequence number of '100' will
// be the first event received.
final EventPosition position = EventPosition.fromSequenceNumber(
nonEmptyPartition.getLastEnqueuedSequenceNumber(), true);

// We start receiving any events that come from `firstPartition`, print out the contents, and decrement the
// countDownLatch.
final EventPosition position = EventPosition.fromSequenceNumber(lastEnqueuedSequenceNumber, false);
Disposable subscription = consumer.receiveFromPartition(lastEnqueuedSequencePartitionId, position).subscribe(partitionEvent -> {
EventData event = partitionEvent.getData();
String contents = new String(event.getBody(), UTF_8);
// ex. The last enqueued sequence number is 99. If isInclusive is true, the received event starting from the same
// event with sequence number of '99'. Otherwise, the event with sequence number of '100' will be the first
// event received.
System.out.println(String.format("Receiving an event starting from the sequence number: %s. Contents: %s",
event.getSequenceNumber(), contents));
// We start receiving any events that come from that non-empty partition, print out the contents.
// We keep receiving events while `takeWhile` resolves to true, that is, the program is still running.
consumer.receiveFromPartition(nonEmptyPartition.getId(), position)
.takeWhile(ignored -> isRunning.get())
.subscribe(partitionEvent -> {
EventData event = partitionEvent.getData();
String contents = new String(event.getBody(), UTF_8);

semaphore.release();
});
System.out.println(String.format("Event sequence number number: %s. Contents: %s%n",
event.getSequenceNumber(), contents));
});

EventHubProducerAsyncClient producer = builder.buildAsyncProducerClient();
// Create a producer.
final EventHubProducerClient producer = builder.buildProducerClient();

// Because the consumer is only listening to new events, we need to send some events to that partition.
// This sends the events to `lastEnqueuedSequencePartitionId`.
SendOptions sendOptions = new SendOptions().setPartitionId(lastEnqueuedSequencePartitionId);
// Because the consumer is only listening to new events after the last enqueued event was received, we need to
// send some events to that partition.
final SendOptions sendOptions = new SendOptions().setPartitionId(nonEmptyPartition.getId());
producer.send(new EventData("Hello world!" .getBytes(UTF_8)), sendOptions);

producer.send(new EventData("Hello world!".getBytes(UTF_8)), sendOptions).block(OPERATION_TIMEOUT);
// Acquiring the semaphore so that this sample does not end before all events are fetched.
semaphore.acquire();
// Set isRunning to false so we stop taking events.
isRunning.set(false);

// Dispose and close of all the resources we've created.
subscription.dispose();
producer.close();
consumer.close();
earliestConsumer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public void initialization() {
// BEGIN: com.azure.messaging.eventhubs.eventhubconsumerasyncclient.instantiation
// The required parameters are `consumerGroup` and a way to authenticate with Event Hubs using credentials.
EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
.connectionString("event-hub-instance-connection-string")
.connectionString("Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};"
+ "SharedAccessKey={key};EntityPath={eh-name}")
.consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
.buildAsyncConsumerClient();
// END: com.azure.messaging.eventhubs.eventhubconsumerasyncclient.instantiation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,46 @@
// Licensed under the MIT License.
package com.azure.messaging.eventhubs;

import java.util.concurrent.Semaphore;

/**
* Demonstrates how to fetch metadata from an Event Hub's partitions.
* Demonstrates how to fetch metadata from an Event Hub's partitions using synchronous client.
*/
public class GetEventHubMetadata {
/**
* Demonstrates how to get metadata from an Event Hub's partitions.
*
* @param args Unused arguments to the sample.
* @throws InterruptedException if the semaphore could not be acquired.
*/
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);

public static void main(String[] args) {
// The connection string value can be obtained by:
// 1. Going to your Event Hubs namespace in Azure Portal.
// 2. Creating an Event Hub instance.
// 3. Creating a "Shared access policy" for your Event Hub instance.
// 4. Copying the connection string from the policy's properties.
String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};SharedAccessKey={sharedAccessKey};EntityPath={eventHubName}";

// Instantiate a client that will be used to call the service.
EventHubProducerAsyncClient client = new EventHubClientBuilder()
// Instantiate a client that will be used to call the service. Using a try-resource block, so it disposes of
// the client when we are done.
EventHubProducerClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncProducerClient();

// Acquiring the semaphore so that this sample does not end before all the partition properties are fetched.
semaphore.acquire();
.buildProducerClient();

// Querying the partition identifiers for the Event Hub. Then calling client.getPartitionProperties with the
// identifier to get information about each partition.
client.getPartitionIds().flatMap(partitionId -> client.getPartitionProperties(partitionId))
.subscribe(properties -> {
System.out.println("The Event Hub has the following properties:");
System.out.printf(
"Event Hub Name: %s; Partition Id: %s; Is partition empty? %s; First Sequence Number: %s; "
+ "Last Enqueued Time: %s; Last Enqueued Sequence Number: %s; Last Enqueued Offset: %s",
properties.getEventHubName(), properties.getId(), properties.isEmpty(),
properties.getBeginningSequenceNumber(),
properties.getLastEnqueuedTime(),
properties.getLastEnqueuedSequenceNumber(),
properties.getLastEnqueuedOffset());
}, error -> {
System.err.println("Error occurred while fetching partition properties: " + error.toString());
}, () -> {
// Releasing the semaphore now that we've finished querying for partition properties.
semaphore.release();
});
for (String partitionId : client.getPartitionIds()) {
PartitionProperties properties = client.getPartitionProperties(partitionId);
System.out.printf(
"Event Hub Name: %s; Partition Id: %s; Is partition empty? %s; First Sequence Number: %s; "
+ "Last Enqueued Time: %s; Last Enqueued Sequence Number: %s; Last Enqueued Offset: %s%n",
properties.getEventHubName(),
properties.getId(),
properties.isEmpty(),
properties.getBeginningSequenceNumber(),
properties.getLastEnqueuedTime(),
properties.getLastEnqueuedSequenceNumber(),
properties.getLastEnqueuedOffset());
}

System.out.println("Waiting for partition properties to complete...");
semaphore.acquire();
System.out.println("Finished.");
// Dispose of the client.
client.close();
}
}

This file was deleted.

Loading