Skip to content

Commit

Permalink
foo
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Jun 16, 2015
1 parent bdf1674 commit 32a8c90
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 67 deletions.
238 changes: 174 additions & 64 deletions src/main/java/io/vertx/core/http/impl/HttpServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ private void applyConnectionOptions(ServerBootstrap bootstrap) {
}
if (options.isUsePooledBuffers()) {
//ByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
///FIXME need to make num arenas properly configurable
ByteBufAllocator allocator = new PooledByteBufAllocator(40, 40, 8192, 11);
System.out.println("Using pooled buffers: " + allocator + " direct preferred? " + PlatformDependent.directBufferPreferred());
bootstrap.childOption(ChannelOption.ALLOCATOR, allocator);
Expand Down Expand Up @@ -460,89 +461,198 @@ private void sendError(CharSequence err, HttpResponseStatus status, Channel ch)

FullHttpRequest wsRequest;


@Override
protected void doMessageReceived(ServerConnection conn, ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (expectingWebsockets) {
handleExpectWebsockets(conn, ctx, msg);
public final void channelRead(ChannelHandlerContext chctx, Object msg) throws Exception {
Object message = safeObject(msg, chctx.alloc());
ServerConnection connection = getConnection(chctx.channel());

ContextImpl context;
if (connection != null) {
context = getContext(connection);
connection.startRead();
} else {
// Separate out into a separate method where we involve instanceof checks and checks for websocket handshake headers
handleHttp(conn, ch, msg);
context = null;
}
if (connection != null) {
context.executeFromIO(() -> doMessageReceived(connection, chctx, message));
} else {
// We execute this directly as we don't have a context yet, the context will have to be set manually
// inside doMessageReceived();
try {
doMessageReceived(null, chctx, message);
} catch (Throwable t) {
chctx.pipeline().fireExceptionCaught(t);
}
}
}

private void handleExpectWebsockets(ServerConnection conn, ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
// @Override
// protected void channelRead(final ServerConnection connection, final ContextImpl context, final ChannelHandlerContext chctx, final Object msg) throws Exception {
//
// if (connection != null) {
// context.executeFromIO(() -> doMessageReceived(connection, chctx, msg));
// } else {
// // We execute this directly as we don't have a context yet, the context will have to be set manually
// // inside doMessageReceived();
// try {
// doMessageReceived(null, chctx, msg);
// } catch (Throwable t) {
// chctx.pipeline().fireExceptionCaught(t);
// }
// }
// }

if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
@Override
protected final void doMessageReceived(ServerConnection conn, ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (expectingWebsockets) {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;

if (log.isTraceEnabled()) log.trace("Server received request: " + request.getUri());
if (log.isTraceEnabled()) log.trace("Server received request: " + request.getUri());

if (request.headers().contains(io.vertx.core.http.HttpHeaders.UPGRADE, io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) {
if (wsRequest == null) {
if (request instanceof FullHttpRequest) {
handshake((FullHttpRequest) request, ch, ctx);
} else {
wsRequest = new DefaultFullHttpRequest(request.getProtocolVersion(), request.getMethod(), request.getUri());
wsRequest.headers().set(request.headers());
if (request.headers().contains(io.vertx.core.http.HttpHeaders.UPGRADE, io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) {
if (wsRequest == null) {
if (request instanceof FullHttpRequest) {
handshake((FullHttpRequest) request, ch, ctx);
} else {
wsRequest = new DefaultFullHttpRequest(request.getProtocolVersion(), request.getMethod(), request.getUri());
wsRequest.headers().set(request.headers());
}
}
}
} else {
handleHttp(conn, ch, msg);
}
} else if (msg instanceof WebSocketFrameInternal) {
//Websocket frame
WebSocketFrameInternal wsFrame = (WebSocketFrameInternal)msg;
switch (wsFrame.type()) {
case BINARY:
case CONTINUATION:
case TEXT:
if (conn != null) {
} else {
//HTTP request
if (conn == null) {
createConnAndHandle(ch, msg, null);
} else {
conn.handleMessage(msg);
}
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
ch.writeAndFlush(new WebSocketFrameImpl(FrameType.PONG, wsFrame.getBinaryData()));
break;
case CLOSE:
if (!closeFrameSent) {
// Echo back close frame and close the connection once it was written.
// This is specified in the WebSockets RFC 6455 Section 5.4.1
ch.writeAndFlush(wsFrame).addListener(ChannelFutureListener.CLOSE);
closeFrameSent = true;
}
} else if (msg instanceof WebSocketFrameInternal) {
//Websocket frame
WebSocketFrameInternal wsFrame = (WebSocketFrameInternal)msg;
switch (wsFrame.type()) {
case BINARY:
case CONTINUATION:
case TEXT:
if (conn != null) {
conn.handleMessage(msg);
}
break;
case PING:
// Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
ch.writeAndFlush(new WebSocketFrameImpl(FrameType.PONG, wsFrame.getBinaryData()));
break;
case CLOSE:
if (!closeFrameSent) {
// Echo back close frame and close the connection once it was written.
// This is specified in the WebSockets RFC 6455 Section 5.4.1
ch.writeAndFlush(wsFrame).addListener(ChannelFutureListener.CLOSE);
closeFrameSent = true;
}
break;
default:
throw new IllegalStateException("Invalid type: " + wsFrame.type());
}
} else if (msg instanceof HttpContent) {
if (wsRequest != null) {
wsRequest.content().writeBytes(((HttpContent) msg).content());
if (msg instanceof LastHttpContent) {
FullHttpRequest req = wsRequest;
wsRequest = null;
handshake(req, ch, ctx);
return;
}
break;
default:
throw new IllegalStateException("Invalid type: " + wsFrame.type());
}
} else if (msg instanceof HttpContent) {
if (wsRequest != null) {
wsRequest.content().writeBytes(((HttpContent) msg).content());
if (msg instanceof LastHttpContent) {
FullHttpRequest req = wsRequest;
wsRequest = null;
handshake(req, ch, ctx);
return;
}
if (conn != null) {
conn.handleMessage(msg);
}
} else {
throw new IllegalStateException("Invalid message " + msg);
}
if (conn != null) {
} else {
// HTTP request
if (conn == null) {
createConnAndHandle(ch, msg, null);
} else {
conn.handleMessage(msg);
}
} else {
throw new IllegalStateException("Invalid message " + msg);
}
}

private void handleHttp(ServerConnection conn, Channel ch, Object msg) {
//HTTP request
if (conn == null) {
createConnAndHandle(ch, msg, null);
} else {
conn.handleMessage(msg);
}
}
// private void handleExpectWebsockets(ServerConnection conn, ChannelHandlerContext ctx, Object msg) throws Exception {
// Channel ch = ctx.channel();
//
// if (msg instanceof HttpRequest) {
// HttpRequest request = (HttpRequest) msg;
//
// if (log.isTraceEnabled()) log.trace("Server received request: " + request.getUri());
//
// if (request.headers().contains(io.vertx.core.http.HttpHeaders.UPGRADE, io.vertx.core.http.HttpHeaders.WEBSOCKET, true)) {
// if (wsRequest == null) {
// if (request instanceof FullHttpRequest) {
// handshake((FullHttpRequest) request, ch, ctx);
// } else {
// wsRequest = new DefaultFullHttpRequest(request.getProtocolVersion(), request.getMethod(), request.getUri());
// wsRequest.headers().set(request.headers());
// }
// }
// } else {
// handleHttp(conn, ch, msg);
// }
// } else if (msg instanceof WebSocketFrameInternal) {
// //Websocket frame
// WebSocketFrameInternal wsFrame = (WebSocketFrameInternal)msg;
// switch (wsFrame.type()) {
// case BINARY:
// case CONTINUATION:
// case TEXT:
// if (conn != null) {
// conn.handleMessage(msg);
// }
// break;
// case PING:
// // Echo back the content of the PING frame as PONG frame as specified in RFC 6455 Section 5.5.2
// ch.writeAndFlush(new WebSocketFrameImpl(FrameType.PONG, wsFrame.getBinaryData()));
// break;
// case CLOSE:
// if (!closeFrameSent) {
// // Echo back close frame and close the connection once it was written.
// // This is specified in the WebSockets RFC 6455 Section 5.4.1
// ch.writeAndFlush(wsFrame).addListener(ChannelFutureListener.CLOSE);
// closeFrameSent = true;
// }
// break;
// default:
// throw new IllegalStateException("Invalid type: " + wsFrame.type());
// }
// } else if (msg instanceof HttpContent) {
// if (wsRequest != null) {
// wsRequest.content().writeBytes(((HttpContent) msg).content());
// if (msg instanceof LastHttpContent) {
// FullHttpRequest req = wsRequest;
// wsRequest = null;
// handshake(req, ch, ctx);
// return;
// }
// }
// if (conn != null) {
// conn.handleMessage(msg);
// }
// } else {
// throw new IllegalStateException("Invalid message " + msg);
// }
// }
//
// private void handleHttp(ServerConnection conn, Channel ch, Object msg) {
// //HTTP request
// if (conn == null) {
// createConnAndHandle(ch, msg, null);
// } else {
// conn.handleMessage(msg);
// }
// }

private String getWebSocketLocation(ChannelPipeline pipeline, FullHttpRequest req) throws Exception {
String prefix;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/http/impl/VertxHttpHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private static ByteBuf safeBuffer(ByteBufHolder holder, ByteBufAllocator allocat
}

@Override
protected C getConnection(Channel channel) {
protected final C getConnection(Channel channel) {
@SuppressWarnings("unchecked")
VertxNioSocketChannel<C> vch = (VertxNioSocketChannel<C>)channel;
// As an optimisation we store the connection on the channel - this prevents a lookup every time
Expand All @@ -69,7 +69,7 @@ protected C getConnection(Channel channel) {
}

@Override
protected C removeConnection(Channel channel) {
protected final C removeConnection(Channel channel) {
@SuppressWarnings("unchecked")
VertxNioSocketChannel<C> vch = (VertxNioSocketChannel<C>)channel;
vch.conn = null;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/net/impl/ConnectionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected ConnectionBase(VertxInternal vertx, Channel channel, ContextImpl conte
this.metrics = metrics;
}

protected synchronized final void startRead() {
public synchronized final void startRead() {
read = true;
}

Expand Down

0 comments on commit 32a8c90

Please sign in to comment.