Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4.x: Refactor Níma connection context #6109

Merged
merged 2 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions docs-internal/nima-connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Connections
# Server Connection Providers

Níma server uses SPI to discover how to handle a specific incoming connection.
The main entry point is `ServerConnectionProvider` - a service loader provider interface to discover
The main entry point is `ServerConnectionProvider` - a service loader provider interface to discover
which connections are supported.

We have the following implementations currently:
Expand All @@ -15,7 +15,6 @@ We have the following implementations currently:
The connection provider creates a configured `ServerConnectionSelector`, which is then used at runtime.
The selector works based on initial bytes of the connection.


# HTTP/1.1 Upgrade Providers

HTTP/1.1 supports the concept of upgrading a connection. This is supported through
Expand All @@ -33,23 +32,61 @@ the provider with higher weight will be used.

# Configurability

ALl of connection providers, HTTP/1.1 upgrade providers and HTTP/2 subprotocols are configured under `server.connection-providers`, to have a single configuration of a protocol regardless whether this is accessed directly or through upgrade mechanism.
All of connection providers, HTTP/1.1 upgrade providers and HTTP/2 subprotocols are configured
under `server.connection-providers`, to have a single configuration of a protocol regardless whether this is accessed directly or
through upgrade mechanism.

The configuration key is the one provided by the Connection provider, HTTP/1.1 Upgrade provider, or HTTP/2 SubProtocol provider `configKey()` or `configKeys()` method.
The configuration key is the one provided by the Connection provider, HTTP/1.1 Upgrade provider, or HTTP/2 SubProtocol
provider `configKey()` or `configKeys()` method.

As all providers are configured on the same leave, each provider should have a descriptive and unique configuration key
relevant to its purpose.

Example of such configuration (Tyrus and Níma WebSocket both use `websocket`, as only one of them can be active):

```yaml
server:
connection-providers:
http_1_1:
max-prologue-length: 4096
websocket:
origins: ["origin1"]
origins: [ "origin1" ]
http_2:
max-frame-size: 128000
grpc:
something: "value"
```

Configuration levels:

- `io.helidon.nima.webserver.WebServer`
- `io.helidon.nima.webserver.http.DirectHandlers` // should be configurable per socket, also HTTP only
- `io.helidon.common.context.Context`
- `io.helidon.nima.http.media.MediaContext` - should be configurable per socket, HTTP only
- `io.helidon.nima.http.encoding.ContentEncodingContext` - should be configurable per socket, HTTP only
- `io.helidon.nima.webserver.ServerListener` (ListenerConfiguration)

# Server Context and other contexts

To access context related information of server and connection, the following types are available for connections:

- `io.helidon.nima.webserver.ListenerContext` - listener specific
- Helidon Context
- Media context
- Content encoding context
- `io.helidon.common.socket.SocketContext` - socket specific (as in TCP socket)
- remote and local peers
- socket ids (client and server)
- methods to log socket related information in a single format
- `io.helidon.nima.webserver.ConnectionContext extends SocketContext` - connection specific (established connection for a
listener)
- listener context
- executor service
- data writer
- data reader
- router

what we need

- access to server config/socket configuration (so we do not need to repeat the configuration again on contexts)
- should we use composition or inheritance?
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ private void ackSettings() {
stream.prologue(upgradePrologue);
stream.headers(upgradeHeaders, !hasEntity);
upgradeHeaders = null;
ctx.sharedExecutor()
ctx.executor()
.submit(new StreamRunnable(streams, stream, stream.streamId()));
}
}
Expand Down Expand Up @@ -562,7 +562,7 @@ private void doHeaders() {
state = State.READ_FRAME;

// we now have all information needed to execute
ctx.sharedExecutor()
ctx.executor()
.submit(new StreamRunnable(streams, stream, stream.streamId()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import io.helidon.nima.http.media.ReadableEntity;
import io.helidon.nima.http2.Http2Headers;
import io.helidon.nima.webserver.ConnectionContext;
import io.helidon.nima.webserver.ServerContext;
import io.helidon.nima.webserver.ListenerContext;
import io.helidon.nima.webserver.http.HttpSecurity;
import io.helidon.nima.webserver.http.RoutingRequest;

Expand Down Expand Up @@ -76,7 +76,7 @@ class Http2ServerRequest implements RoutingRequest {
it -> entitySupplier.get(),
NO_OP_RUNNABLE,
this.headers,
ctx.serverContext().mediaContext()));
ctx.listenerContext().mediaContext()));
}

static Http2ServerRequest create(ConnectionContext ctx,
Expand Down Expand Up @@ -174,15 +174,15 @@ public RoutingRequest prologue(HttpPrologue newPrologue) {
}

@Override
public ServerContext serverContext() {
return ctx.serverContext();
public ListenerContext listenerContext() {
return ctx.listenerContext();
}

@Override
public Context context() {
if (context == null) {
context = Contexts.context().orElseGet(() -> Context.builder()
.parent(ctx.serverContext().context())
.parent(ctx.listenerContext().context())
.id("[" + serverSocketId() + " " + socketId() + "] http/1.1: " + requestId)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,7 +35,6 @@
import io.helidon.nima.http2.Http2Headers;
import io.helidon.nima.http2.Http2StreamWriter;
import io.helidon.nima.webserver.ConnectionContext;
import io.helidon.nima.webserver.http.ServerResponse;
import io.helidon.nima.webserver.http.ServerResponseBase;

class Http2ServerResponse extends ServerResponseBase<Http2ServerResponse> {
Expand Down Expand Up @@ -66,7 +65,7 @@ class Http2ServerResponse extends ServerResponseBase<Http2ServerResponse> {
}

@Override
public ServerResponse header(HeaderValue header) {
public Http2ServerResponse header(HeaderValue header) {
headers.set(header);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ public class Http2Stream implements Runnable, io.helidon.nima.http2.Http2Stream
0), BufferData.empty());
private static final System.Logger LOGGER = System.getLogger(Http2Stream.class.getName());

// todo nima - use or remove
//private final ContentEncodingContext contentEncodingContext = ContentEncodingContext.create();
private final FlowControl flowControl;
private final ConnectionContext ctx;
private final Http2Config http2Config;
Expand Down Expand Up @@ -267,7 +265,9 @@ public void run() {
writer.write(rst.toFrameData(serverSettings, streamId, Http2Flag.NoFlags.create()), flowControl);
// no sense in throwing an exception, as this is invoked from an executor service directly
} catch (RequestException e) {
DirectHandler handler = ctx.directHandlers().handler(e.eventType());
DirectHandler handler = ctx.listenerContext()
.directHandlers()
.handler(e.eventType());
DirectHandler.TransportResponse response = handler.handle(e.request(),
e.eventType(),
e.status(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,31 @@
import java.util.List;
import java.util.function.Function;

import org.junit.jupiter.api.Test;

import io.helidon.config.Config;
import io.helidon.nima.http2.Http2Setting;
import io.helidon.nima.webserver.ConnectionContext;
import io.helidon.nima.webserver.ListenerContext;
import io.helidon.nima.webserver.Router;
import io.helidon.nima.webserver.ServerContext;
import io.helidon.nima.webserver.WebServer;
import io.helidon.nima.webserver.spi.ServerConnectionProvider;
import io.helidon.nima.webserver.spi.ServerConnectionSelector;

import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ConnectionConfigTest {

private static ConnectionContext mockContext() {
ConnectionContext ctx = mock(ConnectionContext.class);
when(ctx.router()).thenReturn(Router.empty());
when(ctx.listenerContext()).thenReturn(mock(ListenerContext.class));
return ctx;
}

// Verify that HTTP/2 connection provider is properly configured from config file
@Test
void testConnectionConfig() {
Expand All @@ -61,7 +68,6 @@ void testProviderConfigBuilder() {
.build()
.create(it -> Config.empty());


Http2Connection conn = (Http2Connection) provider.connection(mockContext());
// Verify values to be updated from configuration file
assertThat(conn.config().maxFrameSize(), is(4096L));
Expand All @@ -73,7 +79,7 @@ void testProviderConfigBuilder() {

// Verify that HTTP/2 MAX_CONCURRENT_STREAMS is properly configured from builder
@Test
void testConfigMaxConcurrentStreams() {
void testConfigMaxConcurrentStreams() {
// This will pick up application.yaml from the classpath as default configuration file
TestProvider provider = new TestProvider();
WebServer.builder().addConnectionProvider(provider).build();
Expand Down Expand Up @@ -119,11 +125,4 @@ private boolean isConfig() {

}

private static ConnectionContext mockContext() {
ConnectionContext ctx = mock(ConnectionContext.class);
when(ctx.router()).thenReturn(Router.empty());
when(ctx.serverContext()).thenReturn(mock(ServerContext.class));
return ctx;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,39 @@
package io.helidon.nima.testing.junit5.webserver;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.buffers.BufferData;
import io.helidon.common.buffers.DataReader;
import io.helidon.common.buffers.DataWriter;
import io.helidon.common.context.Context;
import io.helidon.common.socket.PeerInfo;
import io.helidon.config.Config;
import io.helidon.nima.http.encoding.ContentEncodingContext;
import io.helidon.nima.http.media.MediaContext;
import io.helidon.nima.webclient.ClientConnection;
import io.helidon.nima.webserver.ConnectionContext;
import io.helidon.nima.webserver.Router;
import io.helidon.nima.webserver.ServerContext;
import io.helidon.nima.webserver.http.DirectHandlers;
import io.helidon.nima.webserver.http1.Http1ConnectionProvider;
import io.helidon.nima.webserver.spi.ServerConnection;

class DirectClientConnection implements ClientConnection {
private final AtomicBoolean serverStarted = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();

private final Router router;
private final DataReader clientReader;
private final DataWriter clientWriter;
private final DataReader serverReader;
private final DataWriter serverWriter;
private final DirectSocket socket;
private final DirectClientServerContext serverContext;

DirectClientConnection(PeerInfo clientPeer,
PeerInfo localPeer,
Router router,
boolean isTls) {

this.router = router;
this.socket = new DirectSocket(localPeer, clientPeer, isTls);

ArrayBlockingQueue<byte[]> serverToClient = new ArrayBlockingQueue<>(1024);
ArrayBlockingQueue<byte[]> clientToServer = new ArrayBlockingQueue<>(1024);

this.clientReader = reader(serverToClient);
this.clientWriter = writer(clientToServer);
this.serverReader = reader(clientToServer);
this.serverWriter = writer(serverToClient);
this.serverContext = new DirectClientServerContext(router,
new DirectSocket(localPeer, clientPeer, isTls),
reader(clientToServer),
writer(serverToClient));
}

@Override
Expand Down Expand Up @@ -142,32 +130,18 @@ private DataReader reader(ArrayBlockingQueue<byte[]> queue) {
}

private void startServer() {
ExecutorService executorService = Executors.newThreadPerTaskExecutor(Thread.ofVirtual().name("direct-test-server", 1)
.factory());
ConnectionContext ctx = ConnectionContext.create(
ServerContext.create(Context.create(),
MediaContext.create(),
ContentEncodingContext.create()),
executorService,
serverWriter,
serverReader,
router,
"unit-server",
"unit-channel",
DirectHandlers.builder().build(),
socket,
-1);

ServerConnection connection = Http1ConnectionProvider.builder()
.build()
.create(it -> Config.empty())
.connection(ctx);
executorService.submit(() -> {
try {
connection.handle();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
.connection(serverContext);

serverContext.executor()
.submit(() -> {
try {
connection.handle();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
Loading