Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Track 2 SB sendMessages() API for messages are being sent at a slow pace #21014

Merged
merged 37 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1c86776
[BUG] Track 2 SB API (Batch Send Messages) doesn't seem to work. Mess…
v-hongli1 Apr 28, 2021
65e5032
[BUG] Track 2 SB API (Batch Send Messages) doesn't seem to work. Mess…
v-hongli1 Apr 28, 2021
381357d
[BUG] Track 2 SB API (Batch Send Messages) doesn't seem to work. Mess…
v-hongli1 Apr 28, 2021
0c4190e
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 29, 2021
2d21d73
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 29, 2021
12fabc7
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 29, 2021
af0b688
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 29, 2021
f67e6c1
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 30, 2021
f718be4
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 30, 2021
3bf18e5
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 30, 2021
7dc3f5a
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 Apr 30, 2021
a601d4e
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 6, 2021
097dfbf
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 6, 2021
e483dc1
Merge remote-tracking branch 'origin/#16127_Messages_are_being_sent_a…
v-hongli1 May 6, 2021
3df6a86
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 6, 2021
f9d33a3
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 6, 2021
d9ac345
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 6, 2021
531d07e
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
be32316
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
10719af
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
ac15cfa
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
cbc967d
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
c305e46
Merge branch 'master' into #16127_Messages_are_being_sent_at_a_slow_pace
v-xuto May 7, 2021
26bda28
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
aa8d1cf
Merge remote-tracking branch 'origin/#16127_Messages_are_being_sent_a…
v-hongli1 May 7, 2021
b8c4c19
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
f0f5924
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 7, 2021
67e75c8
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 8, 2021
667f2e2
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 8, 2021
ba78a3c
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 8, 2021
04840fd
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 8, 2021
cf9a712
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 19, 2021
8a44b07
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 19, 2021
17d055b
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 19, 2021
7403baa
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 20, 2021
752ffe2
Fix ci exception for issue#16127 Change Track 2 SB sendMessages() API…
v-hongli1 May 20, 2021
084966f
Fix issue#16127 Change Track 2 SB sendMessages() API for messages are…
v-hongli1 May 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import com.azure.core.util.logging.ClientLogger;

import java.nio.BufferOverflowException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static com.azure.messaging.servicebus.implementation.MessageUtils.traceMessageSpan;

Expand All @@ -30,7 +32,7 @@ public final class ServiceBusMessageBatch {
private final MessageSerializer serializer;
private final List<ServiceBusMessage> serviceBusMessageList;
private final byte[] eventBytes;
private int sizeInBytes;
private final AtomicInteger sizeInBytes;
private final TracerProvider tracerProvider;
private final String entityPath;
private final String hostname;
Expand All @@ -40,8 +42,8 @@ public final class ServiceBusMessageBatch {
this.maxMessageSize = maxMessageSize;
this.contextProvider = contextProvider;
this.serializer = serializer;
this.serviceBusMessageList = new LinkedList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.serviceBusMessageList = Collections.synchronizedList(new LinkedList<>());
this.sizeInBytes = new AtomicInteger((maxMessageSize / 65536) * 1024); // reserve 1KB for every 64KB
this.eventBytes = new byte[maxMessageSize];
this.tracerProvider = tracerProvider;
this.entityPath = entityPath;
Expand Down Expand Up @@ -72,7 +74,7 @@ public int getMaxSizeInBytes() {
* @return The size of the {@link ServiceBusMessageBatch batch} in bytes.
*/
public int getSizeInBytes() {
return this.sizeInBytes;
return this.sizeInBytes.get();
}

/**
Expand All @@ -97,9 +99,9 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
tracerProvider)
: serviceBusMessage;

final int size;
final AtomicInteger size = new AtomicInteger();
try {
size = getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty());
size.set(getSize(serviceBusMessageUpdated, serviceBusMessageList.isEmpty()));
} catch (BufferOverflowException exception) {
final RuntimeException ex = new ServiceBusException(
new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
Expand All @@ -109,12 +111,9 @@ public boolean tryAddMessage(final ServiceBusMessage serviceBusMessage) {
throw logger.logExceptionAsWarning(ex);
}

synchronized (lock) {
if (this.sizeInBytes + size > this.maxMessageSize) {
return false;
}

this.sizeInBytes += size;
if (this.sizeInBytes.addAndGet(size.get()) > this.maxMessageSize) {
this.sizeInBytes.addAndGet(-1 * size.get());
return false;
}

this.serviceBusMessageList.add(serviceBusMessageUpdated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.StreamSupport;

import static com.azure.core.amqp.implementation.RetryUtil.getRetryPolicy;
import static com.azure.core.amqp.implementation.RetryUtil.withRetry;
Expand Down Expand Up @@ -574,7 +575,8 @@ private Mono<Void> sendIterable(Iterable<ServiceBusMessage> messages, ServiceBus
}

return createMessageBatch().flatMap(messageBatch -> {
messages.forEach(message -> messageBatch.tryAddMessage(message));
StreamSupport.stream(messages.spliterator(), true)
.forEach(message -> messageBatch.tryAddMessage(message));
return sendInternal(messageBatch, transaction);
});
}
Expand Down Expand Up @@ -635,32 +637,29 @@ private Mono<Void> sendInternal(ServiceBusMessageBatch batch, ServiceBusTransact

logger.info("Sending batch with size[{}].", batch.getCount());

Context sharedContext = null;
final List<org.apache.qpid.proton.message.Message> messages = new ArrayList<>();

for (int i = 0; i < batch.getMessages().size(); i++) {
final ServiceBusMessage event = batch.getMessages().get(i);
AtomicReference<Context> sharedContext = new AtomicReference<>(Context.NONE);
final List<org.apache.qpid.proton.message.Message> messages = Collections.synchronizedList(new ArrayList<>());
batch.getMessages().parallelStream().forEach(serviceBusMessage -> {
if (isTracingEnabled) {
parentContext.set(event.getContext());
if (i == 0) {
sharedContext = tracerProvider.getSharedSpanBuilder(SERVICE_BASE_NAME, parentContext.get());
parentContext.set(serviceBusMessage.getContext());
if (sharedContext.get().equals(Context.NONE)) {
sharedContext.set(tracerProvider.getSharedSpanBuilder(SERVICE_BASE_NAME, parentContext.get()));
}
tracerProvider.addSpanLinks(sharedContext.addData(SPAN_CONTEXT_KEY, event.getContext()));
tracerProvider.addSpanLinks(sharedContext.get().addData(SPAN_CONTEXT_KEY, serviceBusMessage.getContext()));
}
final org.apache.qpid.proton.message.Message message = messageSerializer.serialize(event);

final org.apache.qpid.proton.message.Message message = messageSerializer.serialize(serviceBusMessage);
final MessageAnnotations messageAnnotations = message.getMessageAnnotations() == null
? new MessageAnnotations(new HashMap<>())
: message.getMessageAnnotations();

message.setMessageAnnotations(messageAnnotations);
messages.add(message);
}
});

if (isTracingEnabled) {
final Context finalSharedContext = sharedContext == null
final Context finalSharedContext = sharedContext.get().equals(Context.NONE)
? Context.NONE
: sharedContext
: sharedContext.get()
.addData(ENTITY_PATH_KEY, entityName)
.addData(HOST_NAME_KEY, connectionProcessor.getFullyQualifiedNamespace())
.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ public void testStartStopResume() throws InterruptedException {
*/
@Test
public void testErrorRecovery() throws InterruptedException {

List<ServiceBusMessageContext> messageList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
ServiceBusReceivedMessage serviceBusReceivedMessage =
Expand All @@ -204,6 +203,7 @@ public void testErrorRecovery() throws InterruptedException {
new ServiceBusMessageContext(serviceBusReceivedMessage);
messageList.add(serviceBusMessageContext);
}

final Flux<ServiceBusMessageContext> messageFlux = Flux.generate(() -> 0,
(state, sink) -> {
ServiceBusReceivedMessage serviceBusReceivedMessage =
Expand All @@ -219,11 +219,9 @@ public void testErrorRecovery() throws InterruptedException {
});

ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder = getBuilder(messageFlux);

AtomicInteger messageId = new AtomicInteger();
AtomicReference<CountDownLatch> countDownLatch = new AtomicReference<>();
countDownLatch.set(new CountDownLatch(4));

AtomicBoolean assertionFailed = new AtomicBoolean();
ServiceBusProcessorClient serviceBusProcessorClient = new ServiceBusProcessorClient(receiverBuilder,
messageContext -> {
Expand Down