Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling HttpClient.shutdown() causes problem to later created clients #195

Closed
spodila opened this issue Jul 30, 2014 · 2 comments
Closed
Labels
Milestone

Comments

@spodila
Copy link

spodila commented Jul 30, 2014

Here's the code that reproduces the problem:

import io.netty.buffer.ByteBuf;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.SingleNioLoopProvider;
import io.reactivex.netty.protocol.http.client.HttpClient;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import rx.Observable;
import rx.functions.Action0;

public class TestMain {

    public static void main(String[] args) throws InterruptedException {
        RxNetty.useEventLoopProvider(new SingleNioLoopProvider(1));


        final HttpClient<ByteBuf,ByteBuf> client1 = RxNetty.createHttpClient("www.google.com", 80);
        client1.submit(HttpClientRequest.createGet("/")).toBlocking().first();

        Thread.sleep(10000);
        client1.shutdown();
        Thread.sleep(10000);

        final HttpClient<ByteBuf,ByteBuf> client2 = RxNetty.createHttpClient("localhost", 1099);
        client2.submit(HttpClientRequest.createGet("/")).finallyDo(new Action0() {
            @Override
            public void call() {
                client2.shutdown();
            }
        }).toBlocking().first();

    }
}

The stack trace is:

2014-07-30 15:20:54 WARN  AbstractChannel:146 main - Force-closing a channel whose registration task was not accepted by an event loop: [id: 0xb3c7e1ac]
java.util.concurrent.RejectedExecutionException: event executor terminated
    at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:735)
    at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:312)
    at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:718)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:416)
    at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:60)
    at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:48)
    at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:64)
    at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:306)
    at io.netty.bootstrap.Bootstrap.doConnect(Bootstrap.java:133)
    at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:115)
    at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:96)
    at io.reactivex.netty.client.ClientChannelFactoryImpl.connect(ClientChannelFactoryImpl.java:63)
    at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:139)
    at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:115)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable.subscribe(Observable.java:6922)
    at rx.internal.operators.BlockingOperatorToIterator.toIterator(BlockingOperatorToIterator.java:49)
    at rx.observables.BlockingObservable.getIterator(BlockingObservable.java:151)
    at rx.observables.BlockingObservable$2.iterator(BlockingObservable.java:435)
    at rx.observables.BlockingObservable.single(BlockingObservable.java:348)
    at rx.observables.BlockingObservable.first(BlockingObservable.java:165)
    at TestMain.main(TestMain.java:31)
2014-07-30 15:20:54 ERROR rejectedExecution:181 main - Failed to submit a listener notification task. Event loop shut down?
java.util.concurrent.RejectedExecutionException: event executor terminated
    at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:735)
    at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:312)
    at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:718)
    at io.netty.util.concurrent.DefaultPromise.execute(DefaultPromise.java:670)
    at io.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:640)
    at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:138)
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93)
    at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28)
    at io.reactivex.netty.client.ClientChannelFactoryImpl.connect(ClientChannelFactoryImpl.java:74)
    at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:139)
    at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:115)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable$2.call(Observable.java:159)
    at rx.Observable$2.call(Observable.java:155)
    at rx.Observable.subscribe(Observable.java:6922)
    at rx.internal.operators.BlockingOperatorToIterator.toIterator(BlockingOperatorToIterator.java:49)
    at rx.observables.BlockingObservable.getIterator(BlockingObservable.java:151)
    at rx.observables.BlockingObservable$2.iterator(BlockingObservable.java:435)
    at rx.observables.BlockingObservable.single(BlockingObservable.java:348)
    at rx.observables.BlockingObservable.first(BlockingObservable.java:165)
    at TestMain.main(TestMain.java:31)
@tbak
Copy link
Collaborator

tbak commented Jul 31, 2014

new SingleNioLoopProvider(1) object is a reference counted object with initial value set to 0. The sequence of action is:

  • first client gets event loop for the newly configured SingleNioLoopProvider, which increments its refCount value to 1
  • on shutdown, the refCount is decreased to 0, which shuts down the event loop instance
  • clients created afterwards fail immediately

To prevent from closing the even loop, increment its refCount, like in the example below:

public class TestMain {

    public static void main(String[] args) throws InterruptedException {
        SingleNioLoopProvider provider = new SingleNioLoopProvider(1);
        EventLoopGroup newEventLoop = provider.globalClientEventLoop();
        RxEventLoopProvider originalEventLoopProvider = RxNetty.useEventLoopProvider(provider);
        try {
            final HttpClient<ByteBuf, ByteBuf> client1 = RxNetty.createHttpClient("www.google.com", 80);
            client1.submit(HttpClientRequest.createGet("/")).toBlocking().first();

            Thread.sleep(1000);
            client1.shutdown();
            Thread.sleep(1000);

            final HttpClient<ByteBuf, ByteBuf> client2 = RxNetty.createHttpClient("gazeta.pl", 80);
            client2.submit(HttpClientRequest.createGet("/")).finallyDo(new Action0() {
                @Override
                public void call() {
                    client2.shutdown();
                }
            }).toBlocking().first();
        } finally {
            newEventLoop.shutdownGracefully();
            RxNetty.useEventLoopProvider(originalEventLoopProvider);
        }
    }
}

If you look into RxNetty source code, the default provider implementation is also SingleNioLoopProvider. Because it is used by the RxNetty.globalClient, its reference counter is always at least 1.

@tbak tbak closed this as completed Jul 31, 2014
@NiteshKant
Copy link
Member

Had a chat with @tbak .. the current code is wrong in shutting down the eventloop as it assumes that the following scenario is not valid:

-- Create a bunch of clients
-- Shutdown all of them
-- Recreate and use another client.

The current global client holding the eventloop from shutdown is more of a side-effect than a design. So, I will remove the eventloop shutdown from the client for now. Since, the eventloops are always daemon threads, they will be alive only for the lifetime of the application.

@NiteshKant NiteshKant reopened this Jul 31, 2014
@NiteshKant NiteshKant added this to the 0.3.11 milestone Jul 31, 2014
@NiteshKant NiteshKant added the bug label Jul 31, 2014
@NiteshKant NiteshKant self-assigned this Jul 31, 2014
NiteshKant added a commit that referenced this issue Aug 1, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants