Skip to content

Commit

Permalink
Fix out of order writes with async reactive calls (#10579)
Browse files Browse the repository at this point in the history
* Fix out of order writes with async reactive calls
In a few places, I used the pattern where reactive calls would be dispatched to the event loop if they weren't already on the loop. However, if there is a reactive call outside the event loop, and then immediately a reactive call *on* the event loop, the calls can run in the wrong order. The second call on the event loop was not delayed with an execute() call, so it could run before the first call runs.

This fix introduces a central EventLoopSerializer that takes care of this problem. In the above problem scenario, it takes care to also delay the second call even though it was submitted on the event loop.

* fix concurrency bug in RoutingInBoundHandler

* checkstyle

* rename

* revert

* rename
  • Loading branch information
yawkat authored Mar 8, 2024
1 parent af79a87 commit 58fc948
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package io.micronaut.http.client.netty;

import io.micronaut.core.annotation.Internal;
import io.micronaut.http.netty.EventLoopFlow;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.LastHttpContent;
import org.reactivestreams.Publisher;
Expand All @@ -34,7 +34,7 @@
@Internal
final class ReactiveClientWriter extends ChannelInboundHandlerAdapter implements Subscriber<HttpContent> {
private final Publisher<HttpContent> source;
private EventLoop eventLoop;
private EventLoopFlow flow;
private ChannelHandlerContext ctx;
private Subscription subscription;
private boolean writtenLast;
Expand All @@ -45,7 +45,7 @@ final class ReactiveClientWriter extends ChannelInboundHandlerAdapter implements

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
this.eventLoop = ctx.channel().eventLoop();
this.flow = new EventLoopFlow(ctx.channel().eventLoop());
this.ctx = ctx;
source.subscribe(this);
}
Expand All @@ -68,11 +68,12 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio

@Override
public void onSubscribe(Subscription s) {
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> onSubscribe(s));
return;
if (flow.executeNow(() -> onSubscribe0(s))) {
onSubscribe0(s);
}
}

private void onSubscribe0(Subscription s) {
if (ctx == null) {
s.cancel();
} else {
Expand All @@ -85,11 +86,12 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(HttpContent httpContent) {
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> onNext(httpContent));
return;
if (flow.executeNow(() -> onNext0(httpContent))) {
onNext0(httpContent);
}
}

private void onNext0(HttpContent httpContent) {
if (writtenLast) {
throw new IllegalStateException("Already written a LastHttpContent");
}
Expand All @@ -110,22 +112,24 @@ public void onNext(HttpContent httpContent) {

@Override
public void onError(Throwable t) {
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> onError(t));
return;
if (flow.executeNow(() -> onError0(t))) {
onError0(t);
}
}

private void onError0(Throwable t) {
ctx.fireExceptionCaught(t);
ctx.pipeline().remove(ctx.name());
}

@Override
public void onComplete() {
if (!eventLoop.inEventLoop()) {
eventLoop.execute(this::onComplete);
return;
if (flow.executeNow(this::onComplete0)) {
onComplete0();
}
}

private void onComplete0() {
if (!writtenLast) {
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, ctx.voidPromise());
}
Expand Down
152 changes: 152 additions & 0 deletions http-netty/src/main/java/io/micronaut/http/netty/EventLoopFlow.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.http.netty;

import io.micronaut.core.annotation.Internal;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.OrderedEventExecutor;

/**
* This class forwards reactive operations onto an {@link io.netty.channel.EventLoop} if they are
* called from outside that loop. It avoids unnecessary {@link EventExecutor#execute} calls when
* the reactive methods are called inside the loop. All the while it ensures the operations remain
* serialized (in the same order).
* <p>
* Should be used like this:
* <pre>
* public void onNext(Object item) {
* if (serializer.executeNow(() -> onNext0(item))) {
* onNext0(item);
* }
* }
*
* private void onNext0(Object item) {
* ...
* }
* </pre>
* <p>
* This class is <b>not</b> thread-safe: The invariants for calls to {@link #executeNow} are very
* strict. In particular:
* <ul>
* <li>There must be no concurrent calls to {@link #executeNow}.</li>
* <li>When {@link #executeNow} returns {@code true}, the subsequent execution of the child
* method ({@code onNext0} in the above example) must fully complete before the next
* {@link #executeNow} call. This ensures that there are no concurrent calls to the child
* method.</li>
* </ul>
* Both of these invariants are guaranteed by the reactive spec, but may not apply to other use
* cases.
*
* @since 4.4.0
* @author Jonas Konrad
*/
@Internal
public final class EventLoopFlow {
/**
* This adds some extra checks to find bugs.
*/
private static final boolean STRICT_CHECKING = false;

private final OrderedEventExecutor loop;
/**
* Generation assigned to the next task.
*/
private int submitGeneration = 0;
/**
* Generation of the next task that can be executed immediately. All tasks with a lower
* generation count have been fully executed already, with one exception: If the last task
* was submitted on the event loop, {@link #executeNow} returned true and the caller may not
* have fully executed it yet.
*/
private volatile int runGeneration = 0;

public EventLoopFlow(OrderedEventExecutor loop) {
this.loop = loop;
}

/**
* Determine whether the next step can be executed immediately. Iff this method returns
* {@code true}, {@code delayTask} will be ignored and the caller should call the target method
* manually. Iff this method returns {@code false}, the caller should take no further action as
* {@code delayTask} will be run in the future.
*
* @param delayTask The task to run if it can't be run immediately
* @return {@code true} if the caller should instead run the task immediately
*/
public boolean executeNow(Runnable delayTask) {
// pick a generation ID for this task.
int generation = submitGeneration++;
if (loop.inEventLoop()) {
if (runGeneration == generation) {
if (STRICT_CHECKING) {
runGeneration = generation + 1;
try {
delayTask.run();
} finally {
if (runGeneration != generation + 1 || submitGeneration != generation + 1) {
throw new AssertionError("Nested call?");
}
}
return false;
}

/*
* All previous tasks have run completely, the caller can run the task immediately.
* Technically, we should only increment the runGeneration after the caller has
* finished running the task. However, doing it before is safe because of two
* reasons:
* - Any other task already submitted is executed using EventLoop#execute, so it
* will certainly run after the child method completes
* - No new task can be submitted while this task is running, because we're still
* in the outer reactive method call, and the reactive spec forbids nested or
* concurrent calls
*/
runGeneration = generation + 1;
return true;
}
// another task already submitted, need to delay to stay serialized
}
loop.execute(new Delayed(delayTask, generation));
return false;
}

private final class Delayed implements Runnable {
private final Runnable task;
private final int generation;

private Delayed(Runnable task, int generation) {
this.task = task;
this.generation = generation;
}

@Override
public void run() {
if (runGeneration != generation) {
throw new IllegalStateException("Improper run order. Expected " + generation + ", was " + runGeneration);
}
try {
task.run();
} finally {
if (STRICT_CHECKING) {
if (runGeneration != generation) {
throw new AssertionError("Weird");
}
}
runGeneration = generation + 1;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package io.micronaut.http.netty

import io.micronaut.core.annotation.NonNull
import io.netty.util.concurrent.AbstractEventExecutor
import io.netty.util.concurrent.Future
import io.netty.util.concurrent.OrderedEventExecutor
import spock.lang.Specification

import java.util.concurrent.TimeUnit

class EventLoopFlowSpec extends Specification {
def 'outside simple'() {
given:
def mock = new MockEventExecutor()
def serializer = new EventLoopFlow(mock)
boolean run = false

when:
def now = serializer.executeNow { run = true }
then:
!now
!run
mock.submitted.size() == 1

when:
mock.submitted[0].run()
then:
run
}

def 'inside simple'() {
given:
def mock = new MockEventExecutor()
def serializer = new EventLoopFlow(mock)
boolean run = false

when:
mock.inEventLoop = true
def now = serializer.executeNow { run = true }
then:
now
!run
mock.submitted.isEmpty()
}

def 'serialize on inside call after outside call'() {
given:
def mock = new MockEventExecutor()
def serializer = new EventLoopFlow(mock)
boolean run1 = false
boolean run2 = false

when:
def now1 = serializer.executeNow { run1 = true }
then:
!now1
!run1

when:
mock.inEventLoop = true
def now2 = serializer.executeNow { run2 = true }
then:
!now2
!run2
mock.submitted.size() == 2

when:
mock.submitted[0].run()
then:
run1
!run2

when:
mock.submitted[1].run()
then:
run2

when:
def now3 = serializer.executeNow {}
then:
now3
mock.submitted.size() == 2
}

private static class MockEventExecutor extends AbstractEventExecutor implements OrderedEventExecutor {
boolean inEventLoop = false
List<Runnable> submitted = []

@Override
boolean isShuttingDown() {
return false
}

@Override
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return null
}

@Override
Future<?> terminationFuture() {
return null
}

@Override
void shutdown() {

}

@Override
boolean isShutdown() {
return false
}

@Override
boolean isTerminated() {
return false
}

@Override
boolean awaitTermination(long timeout, @NonNull TimeUnit unit) throws InterruptedException {
return false
}

@Override
boolean inEventLoop(Thread thread) {
return inEventLoop
}

@Override
void execute(@NonNull Runnable command) {
submitted.add(command)
}
}
}
Loading

0 comments on commit 58fc948

Please sign in to comment.