Skip to content

Commit

Permalink
Merge pull request #108 from NiteshKant/udp
Browse files Browse the repository at this point in the history
Udp support
  • Loading branch information
NiteshKant committed Apr 29, 2014
2 parents cb29bc4 + 168d069 commit 5ff9d18
Show file tree
Hide file tree
Showing 18 changed files with 606 additions and 171 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.reactivex.netty.examples.java;

import io.netty.channel.socket.DatagramPacket;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ObservableConnection;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

import java.nio.charset.Charset;

/**
* @author Nitesh Kant
*/
public final class HelloUdpClient {

public static void main(String[] args) {
RxNetty.createUdpClient("localhost", HelloUdpServer.PORT).connect()
.flatMap(new Func1<ObservableConnection<DatagramPacket, DatagramPacket>,
Observable<DatagramPacket>>() {
@Override
public Observable<DatagramPacket> call(ObservableConnection<DatagramPacket, DatagramPacket> connection) {
connection.writeStringAndFlush("Is there anybody out there?");
return connection.getInput();
}
}).toBlockingObservable().forEach(new Action1<DatagramPacket>() {
@Override
public void call(DatagramPacket datagramPacket) {
System.out.println("Received a new message: "
+ datagramPacket.content().toString(Charset.defaultCharset()));
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.reactivex.netty.examples.java;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.reactivex.netty.RxNetty;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.ObservableConnection;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import rx.Observable;
import rx.functions.Func1;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
* @author Nitesh Kant
*/
public final class HelloUdpServer {

private static final byte[] WELCOME_MSG_BYTES = "Welcome to the broadcast world!".getBytes(Charset.defaultCharset());
public static final int PORT = 8000;

public static void main(String[] args) {
RxNetty.createUdpServer(PORT, new ConnectionHandler<DatagramPacket, DatagramPacket>() {
@Override
public Observable<Void> handle(final ObservableConnection<DatagramPacket, DatagramPacket> newConnection) {
return newConnection.getInput().flatMap(new Func1<DatagramPacket, Observable<Void>>() {
@Override
public Observable<Void> call(DatagramPacket received) {
InetSocketAddress sender = received.sender();
System.out.println("Received datagram. Sender: " + sender + ", data: "
+ received.content().toString(Charset.defaultCharset()));
ByteBuf data = newConnection.getChannelHandlerContext().alloc().buffer(WELCOME_MSG_BYTES.length);
data.writeBytes(WELCOME_MSG_BYTES);
return newConnection.writeAndFlush(new DatagramPacket(data, sender));
}
});
}
}).startAndWait();
}
}
32 changes: 32 additions & 0 deletions rx-netty/src/main/java/io/reactivex/netty/RxNetty.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
package io.reactivex.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.reactivex.netty.channel.ConnectionHandler;
import io.reactivex.netty.channel.RxEventLoopProvider;
import io.reactivex.netty.channel.SingleNioLoopProvider;
Expand All @@ -31,8 +36,11 @@
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import io.reactivex.netty.protocol.udp.client.UdpClientBuilder;
import io.reactivex.netty.server.RxServer;
import io.reactivex.netty.server.ServerBuilder;
import io.reactivex.netty.protocol.udp.server.UdpServer;
import io.reactivex.netty.protocol.udp.server.UdpServerBuilder;

import static io.reactivex.netty.client.MaxConnectionsBasedStrategy.DEFAULT_MAX_CONNECTIONS;

Expand All @@ -43,6 +51,30 @@ public final class RxNetty {
private RxNetty() {
}

public static <I, O> UdpServer<I, O> createUdpServer(final int port, PipelineConfigurator<I, O> pipelineConfigurator,
ConnectionHandler<I, O> connectionHandler) {
return new UdpServerBuilder<I, O>(port, connectionHandler).pipelineConfigurator(pipelineConfigurator).build();
}

public static <I, O> RxClient<I, O> createUdpClient(String host, int port,
PipelineConfigurator<O, I> pipelineConfigurator) {
return new UdpClientBuilder<I, O>(host, port).channel(NioDatagramChannel.class)
.eventloop(getRxEventLoopProvider().globalClientEventLoop())
.pipelineConfigurator(pipelineConfigurator)
.build();
}

public static UdpServer<DatagramPacket, DatagramPacket> createUdpServer(final int port,
ConnectionHandler<DatagramPacket, DatagramPacket> connectionHandler) {
return new UdpServerBuilder<DatagramPacket, DatagramPacket>(port, connectionHandler).build();
}

public static RxClient<DatagramPacket, DatagramPacket> createUdpClient(String host, int port) {
return new UdpClientBuilder<DatagramPacket, DatagramPacket>(host, port)
.channel(NioDatagramChannel.class)
.eventloop(getRxEventLoopProvider().globalClientEventLoop()).build();
}

public static <I, O> RxServer<I, O> createTcpServer(final int port, PipelineConfigurator<I, O> pipelineConfigurator,
ConnectionHandler<I, O> connectionHandler) {
return new ServerBuilder<I, O>(port, connectionHandler).pipelineConfigurator(pipelineConfigurator).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.reactivex.netty.protocol.http.MultipleFutureListener;
Expand Down Expand Up @@ -93,13 +94,17 @@ public ByteBufAllocator getAllocator() {
return ctx.alloc();
}

public ChannelHandlerContext getChannelHandlerContext() {
return ctx;
}

protected ChannelFuture writeOnChannel(Object msg) {
ChannelFuture writeFuture = ctx.channel().write(msg); // Calling write on context will be wrong as the context will be of a component not necessarily, the head of the pipeline.
ChannelFuture writeFuture = getChannel().write(msg); // Calling write on context will be wrong as the context will be of a component not necessarily, the head of the pipeline.
unflushedWritesListener.listen(writeFuture);
return writeFuture;
}

public ChannelHandlerContext getChannelHandlerContext() {
return ctx;
protected Channel getChannel() {
return ctx.channel();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -38,7 +39,7 @@ public abstract class AbstractClientBuilder<I, O, B extends AbstractClientBuilde
protected final RxClientImpl.ServerInfo serverInfo;
protected final Bootstrap bootstrap;
protected PipelineConfigurator<O, I> pipelineConfigurator;
protected Class<? extends SocketChannel> socketChannel;
protected Class<? extends Channel> socketChannel;
protected EventLoopGroup eventLoopGroup;
protected RxClient.ClientConfig clientConfig;
protected ConnectionPool<O, I> connectionPool;
Expand All @@ -59,11 +60,20 @@ protected AbstractClientBuilder(String host, int port) {
}

public B defaultChannelOptions() {
return channelOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}

public B defaultTcpOptions() {
defaultChannelOptions();
channelOption(ChannelOption.SO_KEEPALIVE, true);
channelOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return channelOption(ChannelOption.TCP_NODELAY, true);
}

public B defaultUdpOptions() {
defaultChannelOptions();
return channelOption(ChannelOption.SO_BROADCAST, true);
}

public B pipelineConfigurator(PipelineConfigurator<O, I> pipelineConfigurator) {
this.pipelineConfigurator = pipelineConfigurator;
return returnBuilder();
Expand All @@ -74,7 +84,7 @@ public <T> B channelOption(ChannelOption<T> option, T value) {
return returnBuilder();
}

public B channel(Class<? extends SocketChannel> socketChannel) {
public B channel(Class<? extends Channel> socketChannel) {
this.socketChannel = socketChannel;
return returnBuilder();
}
Expand Down Expand Up @@ -150,7 +160,7 @@ public C build() {
return createClient();
}

private boolean shouldCreateConnectionPool() {
protected boolean shouldCreateConnectionPool() {
return null == connectionPool && null != limitDeterminationStrategy
|| idleConnectionsTimeoutMillis != PoolConfig.DEFAULT_CONFIG.getMaxIdleTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.reactivex.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
Expand All @@ -24,8 +25,8 @@
public class ClientChannelFactoryImpl<I, O> implements ClientChannelFactory<I,O> {

protected final Bootstrap clientBootstrap;
private final ObservableConnectionFactory<I, O> connectionFactory;
private final RxClient.ServerInfo serverInfo;
protected final ObservableConnectionFactory<I, O> connectionFactory;
protected final RxClient.ServerInfo serverInfo;

public ClientChannelFactoryImpl(Bootstrap clientBootstrap, ObservableConnectionFactory<I, O> connectionFactory,
RxClient.ServerInfo serverInfo) {
Expand All @@ -42,15 +43,15 @@ public ChannelFuture connect(final Subscriber<? super ObservableConnection<I, O>
final PipelineConfigurator<I, O> configurator = getPipelineConfiguratorForAChannel(connHandler,
pipelineConfigurator);
// make the connection
clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
clientBootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
public void initChannel(Channel ch) throws Exception {
configurator.configureNewPipeline(ch.pipeline());
}
});

final ChannelFuture connectFuture = clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort())
.addListener(connHandler);
final ChannelFuture connectFuture = _connect().addListener(connHandler);

subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
Expand All @@ -63,6 +64,10 @@ public void call() {
return connectFuture;
}

protected ChannelFuture _connect() {
return clientBootstrap.connect(serverInfo.getHost(), serverInfo.getPort());
}


protected PipelineConfigurator<I, O> getPipelineConfiguratorForAChannel(ClientConnectionHandler<I, O> connHandler,
PipelineConfigurator<I, O> pipelineConfigurator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

import io.netty.bootstrap.ServerBootstrap;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.server.AbstractServerBuilder;
import io.reactivex.netty.server.ConnectionBasedServerBuilder;
import io.reactivex.netty.server.RxServer;

/**
* A convenience builder to create instances of {@link HttpServer}
*
* @author Nitesh Kant
*/
public class HttpServerBuilder<I, O> extends AbstractServerBuilder<HttpServerRequest<I>, HttpServerResponse<O>,
HttpServerBuilder<I, O>, HttpServer<I, O>> {
public class HttpServerBuilder<I, O>
extends ConnectionBasedServerBuilder<HttpServerRequest<I>, HttpServerResponse<O>, HttpServerBuilder<I, O>> {

public HttpServerBuilder(int port, RequestHandler<I, O> requestHandler) {
super(port, new HttpConnectionHandler<I, O>(requestHandler));
Expand All @@ -37,6 +38,11 @@ public HttpServerBuilder(ServerBootstrap bootstrap, int port, RequestHandler<I,
pipelineConfigurator(PipelineConfigurators.<I, O>httpServerConfigurator());
}

@Override
public HttpServer<I, O> build() {
return (HttpServer<I, O>) super.build();
}

@Override
protected HttpServer<I, O> createServer() {
return new HttpServer<I, O>(serverBootstrap, port, pipelineConfigurator,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.reactivex.netty.protocol.udp.client;

import io.netty.bootstrap.Bootstrap;
import io.reactivex.netty.channel.ObservableConnectionFactory;
import io.reactivex.netty.client.ClientChannelFactory;
import io.reactivex.netty.client.ClientChannelFactoryImpl;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.client.RxClientImpl;
import io.reactivex.netty.pipeline.PipelineConfigurator;

import java.net.InetSocketAddress;

/**
* An implementation of {@link RxClient} for UDP/IP
*
* @author Nitesh Kant
*/
public class UdpClient<I, O> extends RxClientImpl<I, O> {

public UdpClient(ServerInfo serverInfo, Bootstrap clientBootstrap, ClientConfig clientConfig) {
this(serverInfo, clientBootstrap, null, clientConfig);
}

public UdpClient(ServerInfo serverInfo, Bootstrap clientBootstrap, PipelineConfigurator<O, I> pipelineConfigurator,
ClientConfig clientConfig) {
super(serverInfo, clientBootstrap, pipelineConfigurator, clientConfig);
}

@Override
protected ClientChannelFactory<O, I> _newChannelFactory(ServerInfo serverInfo, Bootstrap clientBootstrap,
ObservableConnectionFactory<O, I> connectionFactory) {
final InetSocketAddress receiverAddress = new InetSocketAddress(serverInfo.getHost(), serverInfo.getPort());
return new ClientChannelFactoryImpl<O, I>(clientBootstrap,
new UdpClientConnectionFactory<O, I>(receiverAddress), serverInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactivex.netty.protocol.udp.client;

import io.netty.bootstrap.Bootstrap;
import io.reactivex.netty.client.AbstractClientBuilder;
import io.reactivex.netty.client.RxClient;
import io.reactivex.netty.client.RxClientImpl;

/**
* A builder to build an instance of {@link RxClientImpl}
*
* @author Nitesh Kant
*/
public class UdpClientBuilder<I, O> extends AbstractClientBuilder<I,O, UdpClientBuilder<I, O>, RxClient<I, O>> {

public UdpClientBuilder(String host, int port) {
this(host, port, new Bootstrap());
}

public UdpClientBuilder(String host, int port, Bootstrap bootstrap) {
super(bootstrap, host, port);
}

@Override
protected RxClient<I, O> createClient() {
return new UdpClient<I, O>(serverInfo, bootstrap, pipelineConfigurator, clientConfig);
}

@Override
protected boolean shouldCreateConnectionPool() {
return false; // No connection pools are needed for UDP.
}
}
Loading

0 comments on commit 5ff9d18

Please sign in to comment.