From 37c34146027a208490dc9d4c18357fa4384fa05f Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Thu, 17 Jan 2019 14:59:48 +0800 Subject: [PATCH 1/8] reative support --- dubbo-rpc/dubbo-rpc-rsocket/pom.xml | 128 ++++++ .../rpc/protocol/rsocket/MetadataCodec.java | 26 ++ .../protocol/rsocket/RSocketConstants.java | 16 + .../rpc/protocol/rsocket/RSocketExporter.java | 30 ++ .../rpc/protocol/rsocket/RSocketInvoker.java | 205 +++++++++ .../rpc/protocol/rsocket/RSocketProtocol.java | 421 ++++++++++++++++++ .../rsocket/ReferenceCountRsocketClient.java | 7 + .../internal/org.apache.dubbo.rpc.Protocol | 1 + .../src/test/resources/log4j.xml | 46 ++ dubbo-rpc/pom.xml | 1 + 10 files changed, 881 insertions(+) create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/pom.xml create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml diff --git a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml new file mode 100644 index 00000000000..5e6b8d85a49 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml @@ -0,0 +1,128 @@ + + + 4.0.0 + + org.apache.dubbo + dubbo-rpc + 2.7.0-SNAPSHOT + + dubbo-rpc-rsocket + jar + ${project.artifactId} + The default rpc module of dubbo project + + false + + + + io.rsocket + rsocket-core + 0.11.14 + + + io.rsocket + rsocket-transport-netty + 0.11.14 + + + com.alibaba + fastjson + 1.2.54 + + + org.apache.dubbo + dubbo-rpc-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-remoting-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-config-api + ${project.version} + + + org.apache.dubbo + dubbo-container-api + ${project.parent.version} + + + org.eclipse.jetty + jetty-server + + + org.eclipse.jetty + jetty-servlet + + + + + org.apache.dubbo + dubbo-configcenter-api + ${project.parent.version} + + + org.apache.dubbo + dubbo-remoting-netty4 + ${project.parent.version} + test + + + org.apache.dubbo + dubbo-remoting-mina + ${project.parent.version} + test + + + io.netty + netty-all + test + + + org.apache.dubbo + dubbo-serialization-hessian2 + ${project.parent.version} + test + + + org.apache.dubbo + dubbo-serialization-jdk + ${project.parent.version} + test + + + + javax.validation + validation-api + test + + + org.hibernate + hibernate-validator + test + + + org.glassfish + javax.el + test + + + diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java new file mode 100644 index 00000000000..ca7080adfbf --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java @@ -0,0 +1,26 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import org.apache.dubbo.rpc.Invocation; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +/** + * @author sixie.xyn on 2019/1/3. + */ +public class MetadataCodec { + + public static Map decodeMetadata(byte[] bytes) throws IOException { + return JSON.parseObject(new String(bytes, StandardCharsets.UTF_8), Map.class); + } + + public static byte[] encodeMetadata(Map metadata) throws IOException { + String jsonStr = JSON.toJSONString(metadata); + return jsonStr.getBytes(StandardCharsets.UTF_8); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java new file mode 100644 index 00000000000..7e1e703419e --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java @@ -0,0 +1,16 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +/** + * @author sixie.xyn on 2019/1/3. + */ +public class RSocketConstants { + + public static final String VERSION_KEY = "_version"; + public static final String SERVICE_NAME_KEY = "_service_name"; + public static final String SERVICE_VERSION_KEY = "_service_version"; + public static final String METHOD_NAME_KEY = "_method_name"; + public static final String PARAM_TYPE_KEY = "_param_type"; + public static final String SERIALIZE_TYPE_KEY = "_serialize_type"; + public static final String TIMEOUT_KEY = "_timeout"; + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java new file mode 100644 index 00000000000..4d419c5d30f --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java @@ -0,0 +1,30 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.protocol.AbstractExporter; + +import java.util.Map; + +/** + * @author sixie.xyn on 2019/1/2. + */ +public class RSocketExporter extends AbstractExporter { + + private final String key; + + private final Map> exporterMap; + + public RSocketExporter(Invoker invoker, String key, Map> exporterMap) { + super(invoker); + this.key = key; + this.exporterMap = exporterMap; + } + + @Override + public void unexport() { + super.unexport(); + exporterMap.remove(key); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java new file mode 100644 index 00000000000..a8f35687b6f --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -0,0 +1,205 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; +import org.apache.dubbo.common.serialize.Cleanable; +import org.apache.dubbo.common.serialize.ObjectOutput; +import org.apache.dubbo.common.serialize.Serialization; +import org.apache.dubbo.common.utils.AtomicPositiveInteger; +import org.apache.dubbo.common.utils.ReflectUtils; +import org.apache.dubbo.remoting.RemotingException; +import org.apache.dubbo.remoting.TimeoutException; +import org.apache.dubbo.remoting.buffer.ChannelBufferOutputStream; +import org.apache.dubbo.remoting.exchange.ExchangeClient; +import org.apache.dubbo.remoting.exchange.ResponseFuture; +import org.apache.dubbo.remoting.transport.CodecSupport; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.RpcResult; +import org.apache.dubbo.rpc.SimpleAsyncRpcResult; +import org.apache.dubbo.rpc.protocol.AbstractInvoker; +import org.apache.dubbo.rpc.support.RpcUtils; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.util.DefaultPayload; +import reactor.core.publisher.Mono; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author sixie.xyn on 2019/1/2. + */ +public class RSocketInvoker extends AbstractInvoker { + + private final RSocket[] clients; + + private final AtomicPositiveInteger index = new AtomicPositiveInteger(); + + private final String version; + + private final ReentrantLock destroyLock = new ReentrantLock(); + + private final Set> invokers; + + public RSocketInvoker(Class serviceType, URL url, RSocket[] clients, Set> invokers) { + super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY}); + this.clients = clients; + // get version. + this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0"); + this.invokers = invokers; + } + + @Override + protected Result doInvoke(final Invocation invocation) throws Throwable { + RpcInvocation inv = (RpcInvocation) invocation; + final String methodName = RpcUtils.getMethodName(invocation); + inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); + inv.setAttachment(Constants.VERSION_KEY, version); + + RSocket currentClient; + if (clients.length == 1) { + currentClient = clients[0]; + } else { + currentClient = clients[index.getAndIncrement() % clients.length]; + } + try { + boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); + boolean isAsyncFuture = RpcUtils.isFutureReturnType(inv); + boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); + int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); + if (isOneway) { +// boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); +// currentClient.send(inv, isSent); +// RpcContext.getContext().setFuture(null); +// return new RpcResult(); + } else if (isAsync) { + //encode metadata and data + byte[] metadataBytes = encodeMetadata(invocation); + byte[] dataBytes = encodeData(invocation); + Payload requestPayload = DefaultPayload.create(dataBytes, metadataBytes); + + //ResponseFuture future = currentClient.request(inv, timeout); + Mono responseMono = currentClient.requestResponse(requestPayload); + + // For compatibility + FutureAdapter futureAdapter = new FutureAdapter<>(future); + RpcContext.getContext().setFuture(futureAdapter); + + Result result; + if (isAsyncFuture) { + // register resultCallback, sometimes we need the async result being processed by the filter chain. + result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); + } else { + result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); + } + return result; + } else { + RpcContext.getContext().setFuture(null); + + return (Result) currentClient.request(inv, timeout).get(); + } + } catch (TimeoutException e) { + throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); + } catch (RemotingException e) { + throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); + } + } + + @Override + public boolean isAvailable() { + if (!super.isAvailable()) { + return false; + } + for (RSocket client : clients) { + if (client.availability() > 0) { + return true; + } + } + return false; + } + + @Override + public void destroy() { + // in order to avoid closing a client multiple times, a counter is used in case of connection per jvm, every + // time when client.close() is called, counter counts down once, and when counter reaches zero, client will be + // closed. + if (super.isDestroyed()) { + return; + } else { + // double check to avoid dup close + destroyLock.lock(); + try { + if (super.isDestroyed()) { + return; + } + super.destroy(); + if (invokers != null) { + invokers.remove(this); + } + for (RSocket client : clients) { + try { + client.dispose(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + + } finally { + destroyLock.unlock(); + } + } + } + + private byte[] encodeMetadata(Invocation invocation) throws IOException { + Map metadataMap = new HashMap(); + metadataMap.put(RSocketConstants.VERSION_KEY, version); + metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY)); + metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY)); + metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName()); + metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes())); + return MetadataCodec.encodeMetadata(metadataMap); + } + + private byte[] encodeData(Invocation invocation) throws IOException { + ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream(); + Serialization serialization = CodecSupport.getSerialization(getUrl()); + ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream); + + RpcInvocation inv = (RpcInvocation) invocation; +// out.writeUTF(version); +// out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); +// out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); +// out.writeUTF(inv.getMethodName()); +// out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); + + Object[] args = inv.getArguments(); + if (args != null) { + for (int i = 0; i < args.length; i++) { + out.writeObject(args[i]); + } + } + out.writeObject(RpcUtils.getNecessaryAttachments(inv)); + + //clean + out.flushBuffer(); + if (out instanceof Cleanable) { + ((Cleanable) out).cleanup(); + } + return dataOutputStream.toByteArray(); + } + + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java new file mode 100644 index 00000000000..acf9be14f11 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java @@ -0,0 +1,421 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.utils.NetUtils; +import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.remoting.RemotingException; +import org.apache.dubbo.remoting.exchange.ExchangeChannel; +import org.apache.dubbo.remoting.exchange.ExchangeHandler; +import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; +import org.apache.dubbo.rpc.AsyncContextImpl; +import org.apache.dubbo.rpc.AsyncRpcResult; +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Protocol; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.protocol.AbstractProtocol; + +import io.rsocket.AbstractRSocket; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @author sixie.xyn on 2019/1/2. + */ +public class RSocketProtocol extends AbstractProtocol { +// @Override +// public int getDefaultPort() { +// return 0; +// } +// +// @Override +// public Exporter export(Invoker invoker) throws RpcException { +// return null; +// } +// +// @Override +// public Invoker refer(Class type, URL url) throws RpcException { +// return null; +// } + + public static final String NAME = "rsocket"; + + public static final int DEFAULT_PORT = 30880; + + private static RSocketProtocol INSTANCE; + + // + private final Map serverMap = new ConcurrentHashMap(); + + // + private final Map referenceClientMap = new ConcurrentHashMap(); + + private final ConcurrentMap locks = new ConcurrentHashMap(); + + private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { + + @Override + public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { + if (message instanceof Invocation) { + Invocation inv = (Invocation) message; + Invoker invoker = getInvoker(channel, inv); + // need to consider backward-compatibility if it's a callback + + RpcContext rpcContext = RpcContext.getContext(); + boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false); + if (supportServerAsync) { + CompletableFuture future = new CompletableFuture<>(); + rpcContext.setAsyncContext(new AsyncContextImpl(future)); + } + rpcContext.setRemoteAddress(channel.getRemoteAddress()); + Result result = invoker.invoke(inv); + + if (result instanceof AsyncRpcResult) { + return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); + } else { + return CompletableFuture.completedFuture(result); + } + } + throw new RemotingException(channel, "Unsupported request: " + + (message == null ? null : (message.getClass().getName() + ": " + message)) + + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); + } + + @Override + public void received(Channel channel, Object message) throws RemotingException { + if (message instanceof Invocation) { + reply((ExchangeChannel) channel, message); + } else { + super.received(channel, message); + } + } + + @Override + public void connected(Channel channel) throws RemotingException { + invoke(channel, Constants.ON_CONNECT_KEY); + } + + @Override + public void disconnected(Channel channel) throws RemotingException { + if (logger.isInfoEnabled()) { + logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); + } + invoke(channel, Constants.ON_DISCONNECT_KEY); + } + + private void invoke(Channel channel, String methodKey) { + Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); + if (invocation != null) { + try { + received(channel, invocation); + } catch (Throwable t) { + logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); + } + } + } + + private Invocation createInvocation(Channel channel, URL url, String methodKey) { + String method = url.getParameter(methodKey); + if (method == null || method.length() == 0) { + return null; + } + RpcInvocation invocation = new RpcInvocation(method, new Class[0], new Object[0]); + invocation.setAttachment(Constants.PATH_KEY, url.getPath()); + invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); + invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); + invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); + if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { + invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); + } + return invocation; + } + }; + + public RSocketProtocol() { + INSTANCE = this; + } + + public static RSocketProtocol getRSocketProtocol() { + if (INSTANCE == null) { + ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(RSocketProtocol.NAME); // load + } + return INSTANCE; + } + + public Collection> getExporters() { + return Collections.unmodifiableCollection(exporterMap.values()); + } + + Map> getExporterMap() { + return exporterMap; + } + + private boolean isClientSide(Channel channel) { + InetSocketAddress address = channel.getRemoteAddress(); + URL url = channel.getUrl(); + return url.getPort() == address.getPort() && + NetUtils.filterLocalHost(channel.getUrl().getIp()) + .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress())); + } + + Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException { + int port = channel.getLocalAddress().getPort(); + String path = inv.getAttachments().get(Constants.PATH_KEY); + String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); + RSocketExporter exporter = (RSocketExporter) exporterMap.get(serviceKey); + if (exporter == null) { + throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + } + + return exporter.getInvoker(); + } + + public Collection> getInvokers() { + return Collections.unmodifiableCollection(invokers); + } + + @Override + public int getDefaultPort() { + return DEFAULT_PORT; + } + + @Override + public Exporter export(Invoker invoker) throws RpcException { + URL url = invoker.getUrl(); + + // export service. + String key = serviceKey(url); + RSocketExporter exporter = new RSocketExporter(invoker, key, exporterMap); + exporterMap.put(key, exporter); + + //export an stub service for dispatching event +// Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); +// Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); +// if (isStubSupportEvent && !isCallbackservice) { +// String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); +// if (stubServiceMethods == null || stubServiceMethods.length() == 0) { +// if (logger.isWarnEnabled()) { +// logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + +// "], has set stubproxy support event ,but no stub methods founded.")); +// } +// } else { +// stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); +// } +// } + + openServer(url); + return exporter; + } + + private void openServer(URL url) { + String key = url.getAddress(); + //client can export a service which's only for server to invoke + boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); + if (isServer) { + CloseableChannel server = serverMap.get(key); + if (server == null) { + synchronized (this) { + server = serverMap.get(key); + if (server == null) { + serverMap.put(key, createServer(url)); + } + } + } + } + } + + private CloseableChannel createServer(URL url) { + try { + String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost()); + int bindPort = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); + if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { + bindIp = NetUtils.ANYHOST; + } + return RSocketFactory.receive() + .acceptor(new SocketAcceptorImpl()) + .transport(TcpServerTransport.create(bindIp, bindPort)) + .start() + .block(); + } catch (Throwable e) { + throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); + } + } + + + @Override + public Invoker refer(Class serviceType, URL url) throws RpcException { + // create rpc invoker. + RSocketInvoker invoker = new RSocketInvoker(serviceType, url, getClients(url), invokers); + invokers.add(invoker); + return invoker; + } + + private RSocket[] getClients(URL url) { + // whether to share connection + boolean service_share_connect = false; + int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); + // if not configured, connection is shared, otherwise, one connection for one service + if (connections == 0) { + service_share_connect = true; + connections = 1; + } + + RSocket[] clients = new RSocket[connections]; + for (int i = 0; i < clients.length; i++) { + if (service_share_connect) { + clients[i] = getSharedClient(url); + } else { + clients[i] = initClient(url); + } + } + return clients; + } + + /** + * Get shared connection + */ + private RSocket getSharedClient(URL url) { + String key = url.getAddress(); + RSocket client = referenceClientMap.get(key); + if (client != null) { + return client; + } + + locks.putIfAbsent(key, new Object()); + synchronized (locks.get(key)) { + if (referenceClientMap.containsKey(key)) { + return referenceClientMap.get(key); + } + + client = initClient(url); + referenceClientMap.put(key, client); + locks.remove(key); + return client; + } + } + + /** + * Create new connection + */ + private RSocket initClient(URL url) { + try { + InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()),url.getPort()); + RSocket client = RSocketFactory.connect().acceptor( + rSocket -> + new AbstractRSocket() { + public Mono requestResponse(Payload payload) { + ByteBuffer metadata = payload.getMetadata(); + ByteBuffer data = payload.getData(); + payload.release(); + return Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + } + + @Override + public Flux requestStream(Payload payload) { + return Flux.interval(Duration.ofSeconds(1)) + .map(aLong -> DefaultPayload.create("Bi-di Response => " + aLong)); + } + }) + .transport(TcpClientTransport.create(serverAddress)) + .start() + .block(); + return client; + } catch (Throwable e) { + throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); + } + + } + + @Override + public void destroy() { + for (String key : new ArrayList(serverMap.keySet())) { + CloseableChannel server = serverMap.remove(key); + if (server != null) { + try { + if (logger.isInfoEnabled()) { + logger.info("Close dubbo server: " + server.address()); + } + server.dispose(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + } + + for (String key : new ArrayList(referenceClientMap.keySet())) { + RSocket client = referenceClientMap.remove(key); + if (client != null) { + try { +// if (logger.isInfoEnabled()) { +// logger.info("Close dubbo connect: " + client. + "-->" + client.getRemoteAddress()); +// } + client.dispose(); + } catch (Throwable t) { + logger.warn(t.getMessage(), t); + } + } + } + super.destroy(); + } + + + //server process logic + private static class SocketAcceptorImpl implements SocketAcceptor { + @Override + public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) { + return Mono.just( + new AbstractRSocket() { + + public Mono requestResponse(Payload payload) { + ByteBuffer metadata = payload.getMetadata(); + ByteBuffer data = payload.getData(); + + payload.release(); + return Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + } + + public Flux requestStream(Payload payload) { + payload.release(); + return Flux.error(new UnsupportedOperationException("Request-Stream not implemented.")); + } + + + @Override + public Flux requestChannel(Publisher payloads) { + return Flux.from(payloads) + .map(Payload::getDataUtf8) + .map(s -> "Echo: " + s) + .map(DefaultPayload::create); + } + + }); + } + } +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java new file mode 100644 index 00000000000..6b1e832b126 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java @@ -0,0 +1,7 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +/** + * @author sixie.xyn on 2019/1/2. + */ +public class ReferenceCountRsocketClient { +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol b/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol new file mode 100644 index 00000000000..4f03810c978 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol @@ -0,0 +1 @@ +rsocket=org.apache.dubbo.rpc.protocol.rsocket.RSocketProtocol \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml new file mode 100644 index 00000000000..3c5d2ba218e --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/log4j.xml @@ -0,0 +1,46 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dubbo-rpc/pom.xml b/dubbo-rpc/pom.xml index 9a2cb64b038..184bc30dc4b 100644 --- a/dubbo-rpc/pom.xml +++ b/dubbo-rpc/pom.xml @@ -40,5 +40,6 @@ dubbo-rpc-memcached dubbo-rpc-redis dubbo-rpc-rest + dubbo-rpc-rsocket From dc37c07634875478caf25295b9ba69ec2c689e10 Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Thu, 7 Mar 2019 11:28:15 +0800 Subject: [PATCH 2/8] rsocket support. support using Mono and Flux as return value. --- dubbo-rpc/dubbo-rpc-rsocket/pom.xml | 68 ++- .../protocol/rsocket/FutureSubscriber.java | 81 ++++ .../protocol/rsocket/RSocketConstants.java | 5 +- .../rpc/protocol/rsocket/RSocketInvoker.java | 133 ++++-- .../rpc/protocol/rsocket/RSocketProtocol.java | 430 +++++++++++------- .../rsocket/ReferenceCountRsocketClient.java | 7 - .../rpc/protocol/rsocket/ConsumerDemo.java | 35 ++ .../rpc/protocol/rsocket/ProviderDemo.java | 13 + .../protocol/rsocket/RSocketProtocolTest.java | 204 +++++++++ .../dubbo/rpc/service/DemoException.java | 24 + .../apache/dubbo/rpc/service/DemoService.java | 49 ++ .../dubbo/rpc/service/DemoServiceImpl.java | 139 ++++++ .../dubbo/rpc/service/RemoteService.java | 7 + .../dubbo/rpc/service/RemoteServiceImpl.java | 10 + .../spring/dubbo-rsocket-consumer.xml | 36 ++ .../spring/dubbo-rsocket-provider.xml | 40 ++ 16 files changed, 1018 insertions(+), 263 deletions(-) create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java delete mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml diff --git a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml index 5e6b8d85a49..7a754d85e41 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml +++ b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml @@ -29,6 +29,16 @@ false + + org.springframework + spring-context + 4.3.16.RELEASE + + + org.apache.dubbo + dubbo-registry-multicast + ${project.version} + io.rsocket rsocket-core @@ -49,6 +59,7 @@ dubbo-rpc-api ${project.parent.version} + org.apache.dubbo dubbo-remoting-api @@ -59,6 +70,11 @@ dubbo-config-api ${project.version} + + org.apache.dubbo + dubbo-config-spring + ${project.version} + org.apache.dubbo dubbo-container-api @@ -74,28 +90,6 @@ - - org.apache.dubbo - dubbo-configcenter-api - ${project.parent.version} - - - org.apache.dubbo - dubbo-remoting-netty4 - ${project.parent.version} - test - - - org.apache.dubbo - dubbo-remoting-mina - ${project.parent.version} - test - - - io.netty - netty-all - test - org.apache.dubbo dubbo-serialization-hessian2 @@ -109,20 +103,20 @@ test - - javax.validation - validation-api - test - - - org.hibernate - hibernate-validator - test - - - org.glassfish - javax.el - test - + + + + + + + + + + + + + + + diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java new file mode 100644 index 00000000000..2dcfb8f9501 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java @@ -0,0 +1,81 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import io.rsocket.Payload; +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.Serialization; +import org.apache.dubbo.rpc.RpcResult; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class FutureSubscriber extends CompletableFuture implements Subscriber { + + private final Serialization serialization; + + private final Class retType; + + public FutureSubscriber(Serialization serialization, Class retType){ + this.serialization = serialization; + this.retType = retType; + } + + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(1); + } + + @Override + public void onNext(Payload payload) { + try { + RpcResult rpcResult = new RpcResult(); + ByteBuffer dataBuffer = payload.getData(); + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); + InputStream dataInputStream = new ByteArrayInputStream(dataBytes); + ObjectInput in = serialization.deserialize(null, dataInputStream); + + int flag = in.readByte(); + if((flag & RSocketConstants.FLAG_ERROR) != 0){ + Throwable t = (Throwable) in.readObject(); + rpcResult.setException(t); + }else{ + Object value = null; + if((flag & RSocketConstants.FLAG_NULL_VALUE) == 0){ + if(retType == null) { + value = in.readObject(); + }else{ + value = in.readObject(retType); + } + rpcResult.setValue(value); + } + } + + if((flag & RSocketConstants.FLAG_HAS_ATTACHMENT) !=0 ){ + Map attachment = in.readObject(Map.class); + rpcResult.setAttachments(attachment); + + } + + this.complete(rpcResult); + + + }catch (Throwable t){ + this.completeExceptionally(t); + } + } + + @Override + public void onError(Throwable throwable) { + this.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + } +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java index 7e1e703419e..51e2e972cb3 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java @@ -5,7 +5,6 @@ */ public class RSocketConstants { - public static final String VERSION_KEY = "_version"; public static final String SERVICE_NAME_KEY = "_service_name"; public static final String SERVICE_VERSION_KEY = "_service_version"; public static final String METHOD_NAME_KEY = "_method_name"; @@ -13,4 +12,8 @@ public class RSocketConstants { public static final String SERIALIZE_TYPE_KEY = "_serialize_type"; public static final String TIMEOUT_KEY = "_timeout"; + + public static final int FLAG_ERROR = 0x01; + public static final int FLAG_NULL_VALUE = 0x02; + public static final int FLAG_HAS_ATTACHMENT = 0x04; } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java index a8f35687b6f..d81d1208fd1 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -4,6 +4,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.common.serialize.Cleanable; +import org.apache.dubbo.common.serialize.ObjectInput; import org.apache.dubbo.common.serialize.ObjectOutput; import org.apache.dubbo.common.serialize.Serialization; import org.apache.dubbo.common.utils.AtomicPositiveInteger; @@ -29,15 +30,20 @@ import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.util.DefaultPayload; -import reactor.core.publisher.Mono; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.Exceptions; +import reactor.core.publisher.*; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; +import java.io.*; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; +import java.util.function.Function; /** * @author sixie.xyn on 2019/1/2. @@ -54,12 +60,16 @@ public class RSocketInvoker extends AbstractInvoker { private final Set> invokers; + private final Serialization serialization; + public RSocketInvoker(Class serviceType, URL url, RSocket[] clients, Set> invokers) { super(serviceType, url, new String[]{Constants.INTERFACE_KEY, Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY}); this.clients = clients; // get version. this.version = url.getParameter(Constants.VERSION_KEY, "0.0.0"); this.invokers = invokers; + + this.serialization = CodecSupport.getSerialization(getUrl()); } @Override @@ -76,45 +86,76 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { currentClient = clients[index.getAndIncrement() % clients.length]; } try { - boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); - boolean isAsyncFuture = RpcUtils.isFutureReturnType(inv); - boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); + //TODO support timeout int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); - if (isOneway) { -// boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); -// currentClient.send(inv, isSent); -// RpcContext.getContext().setFuture(null); -// return new RpcResult(); - } else if (isAsync) { - //encode metadata and data - byte[] metadataBytes = encodeMetadata(invocation); - byte[] dataBytes = encodeData(invocation); - Payload requestPayload = DefaultPayload.create(dataBytes, metadataBytes); - - //ResponseFuture future = currentClient.request(inv, timeout); - Mono responseMono = currentClient.requestResponse(requestPayload); - // For compatibility - FutureAdapter futureAdapter = new FutureAdapter<>(future); - RpcContext.getContext().setFuture(futureAdapter); + RpcContext.getContext().setFuture(null); + //encode inv: metadata and data(arg,attachment) + Payload requestPayload = encodeInvocation(invocation); - Result result; - if (isAsyncFuture) { - // register resultCallback, sometimes we need the async result being processed by the filter chain. - result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); - } else { - result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); - } - return result; + Class retType = RpcUtils.getReturnType(invocation); + + if (retType!=null && retType.isAssignableFrom(Mono.class)) { + Mono responseMono = currentClient.requestResponse(requestPayload); + Mono bizMono = responseMono.map(new Function() { + @Override + public Object apply(Payload payload) { + return decodeData(payload); + } + }); + RpcResult rpcResult = new RpcResult(); + rpcResult.setValue(bizMono); + return rpcResult; + } else if (retType!=null && retType.isAssignableFrom(Flux.class)) { + return requestStream(currentClient, requestPayload); } else { - RpcContext.getContext().setFuture(null); + //request-reponse + Mono responseMono = currentClient.requestResponse(requestPayload); + FutureSubscriber futureSubscriber = new FutureSubscriber(serialization, retType); + responseMono.subscribe(futureSubscriber); + return (Result) futureSubscriber.get(); + } + + //TODO support stream arg + } catch (Throwable t) { + throw new RpcException(t); + } + } + + + private Result requestStream(RSocket currentClient, Payload requestPayload) { + Flux responseFlux = currentClient.requestStream(requestPayload); + Flux retFlux = responseFlux.map(new Function() { - return (Result) currentClient.request(inv, timeout).get(); + @Override + public Object apply(Payload payload) { + return decodeData(payload); } - } catch (TimeoutException e) { - throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); - } catch (RemotingException e) { - throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); + }); + + RpcResult rpcResult = new RpcResult(); + rpcResult.setValue(retFlux); + return rpcResult; + } + + + private Object decodeData(Payload payload){ + try { + //TODO save the copy + ByteBuffer dataBuffer = payload.getData(); + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); + InputStream dataInputStream = new ByteArrayInputStream(dataBytes); + ObjectInput in = serialization.deserialize(null, dataInputStream); + int flag = in.readByte(); + if ((flag & RSocketConstants.FLAG_ERROR) != 0) { + Throwable t = (Throwable) in.readObject(); + throw t; + } else { + return in.readObject(); + } + }catch (Throwable t){ + throw Exceptions.propagate(t); } } @@ -163,28 +204,28 @@ public void destroy() { } } + private Payload encodeInvocation(Invocation invocation) throws IOException { + byte[] metadata = encodeMetadata(invocation); + byte[] data = encodeData(invocation); + return DefaultPayload.create(data, metadata); + } + private byte[] encodeMetadata(Invocation invocation) throws IOException { Map metadataMap = new HashMap(); - metadataMap.put(RSocketConstants.VERSION_KEY, version); metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY)); metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY)); metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName()); metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes())); + metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, (Byte) serialization.getContentTypeId()); return MetadataCodec.encodeMetadata(metadataMap); } + private byte[] encodeData(Invocation invocation) throws IOException { ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream(); Serialization serialization = CodecSupport.getSerialization(getUrl()); ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream); - RpcInvocation inv = (RpcInvocation) invocation; -// out.writeUTF(version); -// out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); -// out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); -// out.writeUTF(inv.getMethodName()); -// out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); - Object[] args = inv.getArguments(); if (args != null) { for (int i = 0; i < args.length; i++) { @@ -200,6 +241,4 @@ private byte[] encodeData(Invocation invocation) throws IOException { } return dataOutputStream.toByteArray(); } - - } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java index acf9be14f11..7cbe4f311f0 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java @@ -1,40 +1,33 @@ package org.apache.dubbo.rpc.protocol.rsocket; +import io.rsocket.*; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.CloseableChannel; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.serialize.ObjectOutput; import org.apache.dubbo.common.utils.NetUtils; -import org.apache.dubbo.remoting.Channel; +import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.remoting.RemotingException; -import org.apache.dubbo.remoting.exchange.ExchangeChannel; -import org.apache.dubbo.remoting.exchange.ExchangeHandler; -import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter; -import org.apache.dubbo.rpc.AsyncContextImpl; -import org.apache.dubbo.rpc.AsyncRpcResult; -import org.apache.dubbo.rpc.Exporter; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Protocol; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.remoting.transport.CodecSupport; +import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.protocol.AbstractProtocol; - -import io.rsocket.AbstractRSocket; -import io.rsocket.ConnectionSetupPayload; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.RSocketFactory; -import io.rsocket.SocketAcceptor; -import io.rsocket.transport.netty.client.TcpClientTransport; -import io.rsocket.transport.netty.server.CloseableChannel; -import io.rsocket.transport.netty.server.TcpServerTransport; -import io.rsocket.util.DefaultPayload; +import org.apache.dubbo.rpc.support.RpcUtils; import org.reactivestreams.Publisher; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.time.Duration; @@ -42,28 +35,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; /** * @author sixie.xyn on 2019/1/2. */ public class RSocketProtocol extends AbstractProtocol { -// @Override -// public int getDefaultPort() { -// return 0; -// } -// -// @Override -// public Exporter export(Invoker invoker) throws RpcException { -// return null; -// } -// -// @Override -// public Invoker refer(Class type, URL url) throws RpcException { -// return null; -// } + + private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class); public static final String NAME = "rsocket"; @@ -79,85 +60,6 @@ public class RSocketProtocol extends AbstractProtocol { private final ConcurrentMap locks = new ConcurrentHashMap(); - private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { - - @Override - public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { - if (message instanceof Invocation) { - Invocation inv = (Invocation) message; - Invoker invoker = getInvoker(channel, inv); - // need to consider backward-compatibility if it's a callback - - RpcContext rpcContext = RpcContext.getContext(); - boolean supportServerAsync = invoker.getUrl().getMethodParameter(inv.getMethodName(), Constants.ASYNC_KEY, false); - if (supportServerAsync) { - CompletableFuture future = new CompletableFuture<>(); - rpcContext.setAsyncContext(new AsyncContextImpl(future)); - } - rpcContext.setRemoteAddress(channel.getRemoteAddress()); - Result result = invoker.invoke(inv); - - if (result instanceof AsyncRpcResult) { - return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r); - } else { - return CompletableFuture.completedFuture(result); - } - } - throw new RemotingException(channel, "Unsupported request: " - + (message == null ? null : (message.getClass().getName() + ": " + message)) - + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); - } - - @Override - public void received(Channel channel, Object message) throws RemotingException { - if (message instanceof Invocation) { - reply((ExchangeChannel) channel, message); - } else { - super.received(channel, message); - } - } - - @Override - public void connected(Channel channel) throws RemotingException { - invoke(channel, Constants.ON_CONNECT_KEY); - } - - @Override - public void disconnected(Channel channel) throws RemotingException { - if (logger.isInfoEnabled()) { - logger.info("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); - } - invoke(channel, Constants.ON_DISCONNECT_KEY); - } - - private void invoke(Channel channel, String methodKey) { - Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); - if (invocation != null) { - try { - received(channel, invocation); - } catch (Throwable t) { - logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); - } - } - } - - private Invocation createInvocation(Channel channel, URL url, String methodKey) { - String method = url.getParameter(methodKey); - if (method == null || method.length() == 0) { - return null; - } - RpcInvocation invocation = new RpcInvocation(method, new Class[0], new Object[0]); - invocation.setAttachment(Constants.PATH_KEY, url.getPath()); - invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); - invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); - invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); - if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { - invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); - } - return invocation; - } - }; - public RSocketProtocol() { INSTANCE = this; } @@ -177,21 +79,13 @@ Map> getExporterMap() { return exporterMap; } - private boolean isClientSide(Channel channel) { - InetSocketAddress address = channel.getRemoteAddress(); - URL url = channel.getUrl(); - return url.getPort() == address.getPort() && - NetUtils.filterLocalHost(channel.getUrl().getIp()) - .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress())); - } - - Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException { - int port = channel.getLocalAddress().getPort(); - String path = inv.getAttachments().get(Constants.PATH_KEY); - String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); + Invoker getInvoker(int port, Map metadataMap) throws RemotingException { + String path = (String) metadataMap.get(RSocketConstants.SERVICE_NAME_KEY); + String serviceKey = serviceKey(port, path, (String) metadataMap.get(RSocketConstants.SERVICE_VERSION_KEY), (String) metadataMap.get(Constants.GROUP_KEY)); RSocketExporter exporter = (RSocketExporter) exporterMap.get(serviceKey); if (exporter == null) { - throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + //throw new Throwable("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + throw new RuntimeException("Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch "); } return exporter.getInvoker(); @@ -215,21 +109,6 @@ public Exporter export(Invoker invoker) throws RpcException { RSocketExporter exporter = new RSocketExporter(invoker, key, exporterMap); exporterMap.put(key, exporter); - //export an stub service for dispatching event -// Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); -// Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); -// if (isStubSupportEvent && !isCallbackservice) { -// String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); -// if (stubServiceMethods == null || stubServiceMethods.length() == 0) { -// if (logger.isWarnEnabled()) { -// logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + -// "], has set stubproxy support event ,but no stub methods founded.")); -// } -// } else { -// stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); -// } -// } - openServer(url); return exporter; } @@ -259,7 +138,7 @@ private CloseableChannel createServer(URL url) { bindIp = NetUtils.ANYHOST; } return RSocketFactory.receive() - .acceptor(new SocketAcceptorImpl()) + .acceptor(new SocketAcceptorImpl(bindPort)) .transport(TcpServerTransport.create(bindIp, bindPort)) .start() .block(); @@ -326,21 +205,19 @@ private RSocket getSharedClient(URL url) { */ private RSocket initClient(URL url) { try { - InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()),url.getPort()); - RSocket client = RSocketFactory.connect().acceptor( + InetSocketAddress serverAddress = new InetSocketAddress(NetUtils.filterLocalHost(url.getHost()), url.getPort()); + RSocket client = RSocketFactory.connect().keepAliveTickPeriod(Duration.ZERO).keepAliveAckTimeout(Duration.ZERO).acceptor( rSocket -> new AbstractRSocket() { public Mono requestResponse(Payload payload) { - ByteBuffer metadata = payload.getMetadata(); - ByteBuffer data = payload.getData(); - payload.release(); - return Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + //TODO support Mono arg + throw new UnsupportedOperationException(); } @Override public Flux requestStream(Payload payload) { - return Flux.interval(Duration.ofSeconds(1)) - .map(aLong -> DefaultPayload.create("Bi-di Response => " + aLong)); + //TODO support Flux arg + throw new UnsupportedOperationException(); } }) .transport(TcpClientTransport.create(serverAddress)) @@ -387,34 +264,245 @@ public void destroy() { //server process logic - private static class SocketAcceptorImpl implements SocketAcceptor { + private class SocketAcceptorImpl implements SocketAcceptor { + + private final int port; + + public SocketAcceptorImpl(int port) { + this.port = port; + } + @Override public Mono accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) { return Mono.just( new AbstractRSocket() { - public Mono requestResponse(Payload payload) { - ByteBuffer metadata = payload.getMetadata(); - ByteBuffer data = payload.getData(); - - payload.release(); - return Mono.error(new UnsupportedOperationException("Request-Response not implemented.")); + try { + Map metadata = decodeMetadata(payload); + Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue(); + Invocation inv = decodeInvocation(payload, metadata, serializeId); + + Result result = inv.getInvoker().invoke(inv); + + Class retType = RpcUtils.getReturnType(inv); + //ok + if (retType != null && Mono.class.isAssignableFrom(retType)) { + Throwable th = result.getException(); + if (th == null) { + Mono bizMono = (Mono) result.getValue(); + Mono retMono = bizMono.map(new Function() { + @Override + public Payload apply(Object o) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) 0); + out.writeObject(o); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload responsePayload = DefaultPayload.create(bos.toByteArray()); + return responsePayload; + } catch (Throwable t) { + throw Exceptions.propagate(t); + } + } + }).onErrorResume(new Function>() { + @Override + public Publisher apply(Throwable throwable) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) RSocketConstants.FLAG_ERROR); + out.writeObject(throwable); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload errorPayload = DefaultPayload.create(bos.toByteArray()); + return Flux.just(errorPayload); + } catch (Throwable t) { + throw Exceptions.propagate(t); + } + } + }); + + return retMono; + } else { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) RSocketConstants.FLAG_ERROR); + out.writeObject(th); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload errorPayload = DefaultPayload.create(bos.toByteArray()); + return Mono.just(errorPayload); + } + + } else { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + int flag = RSocketConstants.FLAG_HAS_ATTACHMENT; + + Throwable th = result.getException(); + if (th == null) { + Object ret = result.getValue(); + if (ret == null) { + flag |= RSocketConstants.FLAG_NULL_VALUE; + out.writeByte((byte) flag); + } else { + out.writeByte((byte) flag); + out.writeObject(ret); + } + } else { + flag |= RSocketConstants.FLAG_ERROR; + out.writeByte((byte) flag); + out.writeObject(th); + } + out.writeObject(result.getAttachments()); + out.flushBuffer(); + bos.flush(); + bos.close(); + + Payload responsePayload = DefaultPayload.create(bos.toByteArray()); + return Mono.just(responsePayload); + } + } catch (Throwable t) { + //application error + return Mono.error(t); + } finally { + payload.release(); + } } public Flux requestStream(Payload payload) { - payload.release(); - return Flux.error(new UnsupportedOperationException("Request-Stream not implemented.")); + try { + Map metadata = decodeMetadata(payload); + Byte serializeId = ((Integer) metadata.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue(); + Invocation inv = decodeInvocation(payload, metadata, serializeId); + + Result result = inv.getInvoker().invoke(inv); + //Class retType = RpcUtils.getReturnType(inv); + + Throwable th = result.getException(); + if (th != null) { + Payload errorPayload = encodeError(th, serializeId); + return Flux.just(errorPayload); + } + + Flux flux = (Flux) result.getValue(); + Flux retFlux = flux.map(new Function() { + @Override + public Payload apply(Object o) { + try { + return encodeData(o, serializeId); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }).onErrorResume(new Function>() { + @Override + public Publisher apply(Throwable throwable) { + try { + Payload errorPayload = encodeError(throwable,serializeId); + return Flux.just(errorPayload); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }); + return retFlux; + } catch (Throwable t) { + return Flux.error(t); + } finally { + payload.release(); + } } + private Payload encodeData(Object data, byte serializeId) throws Throwable{ + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) 0); + out.writeObject(data); + out.flushBuffer(); + bos.flush(); + bos.close(); + return DefaultPayload.create(bos.toByteArray()); + } - @Override - public Flux requestChannel(Publisher payloads) { - return Flux.from(payloads) - .map(Payload::getDataUtf8) - .map(s -> "Echo: " + s) - .map(DefaultPayload::create); + private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable{ + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) RSocketConstants.FLAG_ERROR); + out.writeObject(throwable); + out.flushBuffer(); + bos.flush(); + bos.close(); + return DefaultPayload.create(bos.toByteArray()); } + private Map decodeMetadata(Payload payload) throws IOException { + ByteBuffer metadataBuffer = payload.getMetadata(); + byte[] metadataBytes = new byte[metadataBuffer.remaining()]; + metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining()); + return MetadataCodec.decodeMetadata(metadataBytes); + } + + private Invocation decodeInvocation(Payload payload, Map metadata, Byte serializeId) throws RemotingException, IOException, ClassNotFoundException { + Invoker invoker = getInvoker(port, metadata); + + String serviceName = (String) metadata.get(RSocketConstants.SERVICE_NAME_KEY); + String version = (String) metadata.get(RSocketConstants.SERVICE_VERSION_KEY); + String methodName = (String) metadata.get(RSocketConstants.METHOD_NAME_KEY); + String paramType = (String) metadata.get(RSocketConstants.PARAM_TYPE_KEY); + + ByteBuffer dataBuffer = payload.getData(); + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); + + + //TODO how to get remote address + //RpcContext rpcContext = RpcContext.getContext(); + //rpcContext.setRemoteAddress(channel.getRemoteAddress()); + + + RpcInvocation inv = new RpcInvocation(); + inv.setInvoker(invoker); + inv.setAttachment(Constants.PATH_KEY, serviceName); + inv.setAttachment(Constants.VERSION_KEY, version); + inv.setMethodName(methodName); + + + InputStream dataInputStream = new ByteArrayInputStream(dataBytes); + ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream); + + Object[] args; + Class[] pts; + String desc = paramType; + if (desc.length() == 0) { + pts = new Class[0]; + args = new Object[0]; + } else { + pts = ReflectUtils.desc2classArray(desc); + args = new Object[pts.length]; + for (int i = 0; i < args.length; i++) { + try { + args[i] = in.readObject(pts[i]); + } catch (Exception e) { + if (log.isWarnEnabled()) { + log.warn("Decode argument failed: " + e.getMessage(), e); + } + } + } + } + inv.setParameterTypes(pts); + inv.setArguments(args); + Map map = (Map) in.readObject(Map.class); + if (map != null && map.size() > 0) { + inv.addAttachments(map); + } + return inv; + } }); } } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java deleted file mode 100644 index 6b1e832b126..00000000000 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ReferenceCountRsocketClient.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.dubbo.rpc.protocol.rsocket; - -/** - * @author sixie.xyn on 2019/1/2. - */ -public class ReferenceCountRsocketClient { -} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java new file mode 100644 index 00000000000..79142de6c0e --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java @@ -0,0 +1,35 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import org.apache.dubbo.rpc.service.DemoService; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import reactor.core.publisher.Mono; + +import java.util.function.Consumer; + +public class ConsumerDemo { + + public static void main(String[] args) { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-rsocket-consumer.xml"}); + context.start(); + DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy + + while (true) { + try { + Thread.sleep(1000); + Mono resultMono = demoService.requestMono("world"); // call remote method + resultMono.doOnNext(new Consumer() { + @Override + public void accept(String s) { + System.out.println(s); // get result + } + }).block(); + } catch (Throwable throwable) { + throwable.printStackTrace(); + } + + + } + + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java new file mode 100644 index 00000000000..e67133e3f4f --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java @@ -0,0 +1,13 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import org.springframework.context.support.ClassPathXmlApplicationContext; + +public class ProviderDemo { + + public static void main(String[] args) throws Exception { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-rsocket-provider.xml"}); + context.start(); + System.in.read(); // press any key to exit + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java new file mode 100644 index 00000000000..378dec75e79 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java @@ -0,0 +1,204 @@ +package org.apache.dubbo.rpc.protocol.rsocket; + +import io.rsocket.exceptions.ApplicationErrorException; +import org.apache.dubbo.common.Constants; +import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.remoting.exchange.ExchangeServer; +import org.apache.dubbo.rpc.Protocol; +import org.apache.dubbo.rpc.ProxyFactory; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.service.*; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; + +public class RSocketProtocolTest { + + private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); + private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); + + @AfterClass + public static void after() { + RSocketProtocol.getRSocketProtocol().destroy(); + } + + @Test + public void testDemoProtocol() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + assertEquals(service.getSize(new String[]{"", "", ""}), 3); + } + + @Test + public void testDubboProtocol() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + + assertEquals(service.getSize(null), -1); + assertEquals(service.getSize(new String[]{"", "", ""}), 3); + + + Map map = new HashMap(); + map.put("aa", "bb"); + Set set = service.keys(map); + assertEquals(set.size(), 1); + assertEquals(set.iterator().next(), "aa"); + service.invoke("rsocket://127.0.0.1:9010/" + DemoService.class.getName() + "", "invoke"); + + StringBuffer buf = new StringBuffer(); + for (int i = 0; i < 1024 * 32 + 32; i++) + buf.append('A'); + System.out.println(service.stringLength(buf.toString())); + + // cast to EchoService + EchoService echo = proxy.getProxy(protocol.refer(EchoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + assertEquals(echo.$echo(buf.toString()), buf.toString()); + assertEquals(echo.$echo("test"), "test"); + assertEquals(echo.$echo("abcdefg"), "abcdefg"); + assertEquals(echo.$echo(1234), 1234); + } + + + @Test + public void testDubboProtocolThrowable() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + try { + service.errorTest("mike"); + } catch (Throwable t) { + assertEquals(t.getClass(), ArithmeticException.class); + } + } + + @Test + public void testDubboProtocolMultiService() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + + RemoteService remote = new RemoteServiceImpl(); + protocol.export(proxy.getInvoker(remote, RemoteService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + RemoteService.class.getName()))); + remote = proxy.getProxy(protocol.refer(RemoteService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + RemoteService.class.getName()).addParameter("timeout", 3000l))); + + service.sayHello("world"); + + // test netty client + assertEquals("world", service.echo("world")); + assertEquals("hello world", remote.sayHello("world")); + + EchoService serviceEcho = (EchoService) service; + assertEquals(serviceEcho.$echo("test"), "test"); + + EchoService remoteEecho = (EchoService) remote; + assertEquals(remoteEecho.$echo("ok"), "ok"); + } + + + @Test + public void testRequestMono() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + Mono result = service.requestMono("mike"); + + result.doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(s, "hello mike"); + System.out.println(s); + } + }).block(); + + Mono result2 = service.requestMonoOnError("mike"); + result2.onErrorResume(DemoException.class, new Function>() { + @Override + public Mono apply(DemoException e) { + return Mono.just(e.getClass().getName()); + } + }).doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(DemoException.class.getName(), s); + } + }).block(); + + Mono result3 = service.requestMonoBizError("mike"); + result3.onErrorResume(ArithmeticException.class, new Function>() { + @Override + public Mono apply(ArithmeticException e) { + return Mono.just(e.getClass().getName()); + } + }).doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(ArithmeticException.class.getName(), s); + } + }).block(); + + } + + @Test + public void testRequestFlux() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + + { + Flux result = service.requestFlux("mike"); + result.doOnNext(new Consumer() { + @Override + public void accept(String s) { + System.out.println(s); + } + }).blockLast(); + } + + + { + Flux result2 = service.requestFluxOnError("mike"); + result2.onErrorResume(new Function>() { + @Override + public Publisher apply(Throwable throwable) { + return Flux.just(throwable.getClass().getName()); + } + }).takeLast(1).doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(DemoException.class.getName(), s); + } + }).blockLast(); + } + + { + Flux result3 = service.requestFluxBizError("mike"); + result3.onErrorResume(new Function>() { + @Override + public Publisher apply(Throwable throwable) { + return Flux.just(throwable.getClass().getName()); + } + }).takeLast(1).doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(ArithmeticException.class.getName(), s); + } + }).blockLast(); + } + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java new file mode 100644 index 00000000000..7b3c2208cce --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java @@ -0,0 +1,24 @@ +package org.apache.dubbo.rpc.service; + +public class DemoException extends Exception { + + private static final long serialVersionUID = -8213943026163641747L; + + public DemoException() { + super(); + } + + public DemoException(String message, Throwable cause) { + super(message, cause); + } + + public DemoException(String message) { + super(message); + } + + public DemoException(Throwable cause) { + super(cause); + } + +} + diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java new file mode 100644 index 00000000000..15dbbe9167b --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java @@ -0,0 +1,49 @@ +package org.apache.dubbo.rpc.service; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public interface DemoService { + String sayHello(String name); + + Set keys(Map map); + + String echo(String text); + + Map echo(Map map); + + long timestamp(); + + String getThreadName(); + + int getSize(String[] strs); + + int getSize(Object[] os); + + Object invoke(String service, String method) throws Exception; + + int stringLength(String str); + + byte getbyte(byte arg); + + long add(int a, long b); + + String errorTest(String name); + + Mono requestMono(String name); + + Mono requestMonoOnError(String name); + + Mono requestMonoBizError(String name); + + Flux requestFlux(String name); + Flux requestFluxOnError(String name); + Flux requestFluxBizError(String name); + + + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java new file mode 100644 index 00000000000..2b5a3e4d6af --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java @@ -0,0 +1,139 @@ +package org.apache.dubbo.rpc.service; + +import org.apache.dubbo.rpc.RpcContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; + +import java.lang.reflect.Type; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +public class DemoServiceImpl implements DemoService { + public DemoServiceImpl() { + super(); + } + + public String sayHello(String name) { + return "hello " + name; + } + + public String echo(String text) { + return text; + } + + public Map echo(Map map) { + return map; + } + + public long timestamp() { + return System.currentTimeMillis(); + } + + public String getThreadName() { + return Thread.currentThread().getName(); + } + + public int getSize(String[] strs) { + if (strs == null) + return -1; + return strs.length; + } + + public int getSize(Object[] os) { + if (os == null) + return -1; + return os.length; + } + + public Object invoke(String service, String method) throws Exception { + System.out.println("RpcContext.getContext().getRemoteHost()=" + RpcContext.getContext().getRemoteHost()); + return service + ":" + method; + } + + public int stringLength(String str) { + return str.length(); + } + + + public byte getbyte(byte arg) { + return arg; + } + + + public Set keys(Map map) { + return map == null ? null : map.keySet(); + } + + + public long add(int a, long b) { + return a + b; + } + + @Override + public String errorTest(String name) { + int a = 1 / 0; + return null; + } + + public Mono requestMono(String name) { + return Mono.just("hello " + name); + } + + public Mono requestMonoOnError(String name) { + return Mono.error(new DemoException(name)); + } + + public Mono requestMonoBizError(String name) { + int a = 1 / 0; + return Mono.just("hello " + name); + } + + @Override + public Flux requestFlux(String name) { + + return Flux.create(new Consumer>() { + @Override + public void accept(FluxSink fluxSink) { + for (int i = 0; i < 5; i++) { + fluxSink.next(name + " " + i); + } + fluxSink.complete(); + } + }); + + } + + @Override + public Flux requestFluxOnError(String name) { + + return Flux.create(new Consumer>() { + @Override + public void accept(FluxSink fluxSink) { + for (int i = 0; i < 5; i++) { + fluxSink.next(name + " " + i); + } + fluxSink.error(new DemoException()); + } + }); + + } + + @Override + public Flux requestFluxBizError(String name) { + int a = 1 / 0; + return Flux.create(new Consumer>() { + @Override + public void accept(FluxSink fluxSink) { + for (int i = 0; i < 5; i++) { + fluxSink.next(name + " " + i); + } + fluxSink.error(new DemoException()); + } + }); + } + +} + diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java new file mode 100644 index 00000000000..426f8de2365 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java @@ -0,0 +1,7 @@ +package org.apache.dubbo.rpc.service; + +import java.rmi.RemoteException; + +public interface RemoteService { + String sayHello(String name) throws RemoteException; +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java new file mode 100644 index 00000000000..fe7257e9f49 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java @@ -0,0 +1,10 @@ +package org.apache.dubbo.rpc.service; + +import java.rmi.RemoteException; + +public class RemoteServiceImpl implements RemoteService { + @Override + public String sayHello(String name) throws RemoteException { + return "hello "+name; + } +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml new file mode 100644 index 00000000000..f0f25cf9239 --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-consumer.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml new file mode 100644 index 00000000000..e84fb70824f --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/resources/spring/dubbo-rsocket-provider.xml @@ -0,0 +1,40 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file From 4a502dd1d09e867705344cd98a5faab283908c4e Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Thu, 7 Mar 2019 15:15:45 +0800 Subject: [PATCH 3/8] reformat code, remove unused import, add license --- dubbo-rpc/dubbo-rpc-rsocket/pom.xml | 21 ++++--- .../protocol/rsocket/FutureSubscriber.java | 34 ++++++++--- .../rpc/protocol/rsocket/MetadataCodec.java | 19 +++++- .../protocol/rsocket/RSocketConstants.java | 16 +++++ .../rpc/protocol/rsocket/RSocketExporter.java | 18 +++++- .../rpc/protocol/rsocket/RSocketInvoker.java | 61 ++++++++++--------- .../rpc/protocol/rsocket/RSocketProtocol.java | 27 +++++--- .../rpc/protocol/rsocket/ConsumerDemo.java | 16 +++++ .../rpc/protocol/rsocket/ProviderDemo.java | 16 +++++ .../protocol/rsocket/RSocketProtocolTest.java | 24 +++++--- .../dubbo/rpc/service/DemoException.java | 16 +++++ .../apache/dubbo/rpc/service/DemoService.java | 22 ++++++- .../dubbo/rpc/service/DemoServiceImpl.java | 18 +++++- .../dubbo/rpc/service/RemoteService.java | 16 +++++ .../dubbo/rpc/service/RemoteServiceImpl.java | 18 +++++- 15 files changed, 269 insertions(+), 73 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml index 7a754d85e41..b38f74d5fea 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/pom.xml +++ b/dubbo-rpc/dubbo-rpc-rsocket/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> - + 4.0.0 org.apache.dubbo @@ -104,19 +105,19 @@ - - - + + + - - - + + + - - - + + + diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java index 2dcfb8f9501..8f12ba84bd5 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/FutureSubscriber.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; import io.rsocket.Payload; @@ -19,7 +35,7 @@ public class FutureSubscriber extends CompletableFuture implements Su private final Class retType; - public FutureSubscriber(Serialization serialization, Class retType){ + public FutureSubscriber(Serialization serialization, Class retType) { this.serialization = serialization; this.retType = retType; } @@ -41,23 +57,23 @@ public void onNext(Payload payload) { ObjectInput in = serialization.deserialize(null, dataInputStream); int flag = in.readByte(); - if((flag & RSocketConstants.FLAG_ERROR) != 0){ + if ((flag & RSocketConstants.FLAG_ERROR) != 0) { Throwable t = (Throwable) in.readObject(); rpcResult.setException(t); - }else{ + } else { Object value = null; - if((flag & RSocketConstants.FLAG_NULL_VALUE) == 0){ - if(retType == null) { + if ((flag & RSocketConstants.FLAG_NULL_VALUE) == 0) { + if (retType == null) { value = in.readObject(); - }else{ + } else { value = in.readObject(retType); } rpcResult.setValue(value); } } - if((flag & RSocketConstants.FLAG_HAS_ATTACHMENT) !=0 ){ - Map attachment = in.readObject(Map.class); + if ((flag & RSocketConstants.FLAG_HAS_ATTACHMENT) != 0) { + Map attachment = in.readObject(Map.class); rpcResult.setAttachments(attachment); } @@ -65,7 +81,7 @@ public void onNext(Payload payload) { this.complete(rpcResult); - }catch (Throwable t){ + } catch (Throwable t) { this.completeExceptionally(t); } } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java index ca7080adfbf..ce2e1b52dd6 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java @@ -1,9 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; -import org.apache.dubbo.rpc.Invocation; - import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import java.io.IOException; import java.nio.charset.StandardCharsets; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java index 51e2e972cb3..e6ad98ae3bc 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; /** diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java index 4d419c5d30f..074085eaec6 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; import org.apache.dubbo.rpc.Exporter; @@ -9,7 +25,7 @@ /** * @author sixie.xyn on 2019/1/2. */ -public class RSocketExporter extends AbstractExporter { +public class RSocketExporter extends AbstractExporter { private final String key; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java index d81d1208fd1..483741d0b17 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -1,48 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.util.DefaultPayload; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; -import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.common.serialize.Cleanable; import org.apache.dubbo.common.serialize.ObjectInput; import org.apache.dubbo.common.serialize.ObjectOutput; import org.apache.dubbo.common.serialize.Serialization; import org.apache.dubbo.common.utils.AtomicPositiveInteger; import org.apache.dubbo.common.utils.ReflectUtils; -import org.apache.dubbo.remoting.RemotingException; -import org.apache.dubbo.remoting.TimeoutException; -import org.apache.dubbo.remoting.buffer.ChannelBufferOutputStream; -import org.apache.dubbo.remoting.exchange.ExchangeClient; -import org.apache.dubbo.remoting.exchange.ResponseFuture; import org.apache.dubbo.remoting.transport.CodecSupport; -import org.apache.dubbo.rpc.AsyncRpcResult; -import org.apache.dubbo.rpc.Invocation; -import org.apache.dubbo.rpc.Invoker; -import org.apache.dubbo.rpc.Result; -import org.apache.dubbo.rpc.RpcContext; -import org.apache.dubbo.rpc.RpcException; -import org.apache.dubbo.rpc.RpcInvocation; -import org.apache.dubbo.rpc.RpcResult; -import org.apache.dubbo.rpc.SimpleAsyncRpcResult; +import org.apache.dubbo.rpc.*; import org.apache.dubbo.rpc.protocol.AbstractInvoker; import org.apache.dubbo.rpc.support.RpcUtils; - -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.util.DefaultPayload; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import reactor.core.Exceptions; -import reactor.core.publisher.*; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; -import java.io.*; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.Future; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -95,7 +96,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { Class retType = RpcUtils.getReturnType(invocation); - if (retType!=null && retType.isAssignableFrom(Mono.class)) { + if (retType != null && retType.isAssignableFrom(Mono.class)) { Mono responseMono = currentClient.requestResponse(requestPayload); Mono bizMono = responseMono.map(new Function() { @Override @@ -106,7 +107,7 @@ public Object apply(Payload payload) { RpcResult rpcResult = new RpcResult(); rpcResult.setValue(bizMono); return rpcResult; - } else if (retType!=null && retType.isAssignableFrom(Flux.class)) { + } else if (retType != null && retType.isAssignableFrom(Flux.class)) { return requestStream(currentClient, requestPayload); } else { //request-reponse @@ -129,7 +130,7 @@ private Result requestStream(RSocket currentClient, Payload requestPayload) { @Override public Object apply(Payload payload) { - return decodeData(payload); + return decodeData(payload); } }); @@ -139,7 +140,7 @@ public Object apply(Payload payload) { } - private Object decodeData(Payload payload){ + private Object decodeData(Payload payload) { try { //TODO save the copy ByteBuffer dataBuffer = payload.getData(); @@ -154,7 +155,7 @@ private Object decodeData(Payload payload){ } else { return in.readObject(); } - }catch (Throwable t){ + } catch (Throwable t) { throw Exceptions.propagate(t); } } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java index 7cbe4f311f0..1cb36b03ed7 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; import io.rsocket.*; @@ -44,12 +60,9 @@ */ public class RSocketProtocol extends AbstractProtocol { - private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class); - public static final String NAME = "rsocket"; - public static final int DEFAULT_PORT = 30880; - + private static final Logger log = LoggerFactory.getLogger(RSocketProtocol.class); private static RSocketProtocol INSTANCE; // @@ -404,7 +417,7 @@ public Payload apply(Object o) { @Override public Publisher apply(Throwable throwable) { try { - Payload errorPayload = encodeError(throwable,serializeId); + Payload errorPayload = encodeError(throwable, serializeId); return Flux.just(errorPayload); } catch (Throwable t) { throw new RuntimeException(t); @@ -419,7 +432,7 @@ public Publisher apply(Throwable throwable) { } } - private Payload encodeData(Object data, byte serializeId) throws Throwable{ + private Payload encodeData(Object data, byte serializeId) throws Throwable { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); out.writeByte((byte) 0); @@ -430,7 +443,7 @@ private Payload encodeData(Object data, byte serializeId) throws Throwable{ return DefaultPayload.create(bos.toByteArray()); } - private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable{ + private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); out.writeByte((byte) RSocketConstants.FLAG_ERROR); diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java index 79142de6c0e..d73bbae6249 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ConsumerDemo.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; import org.apache.dubbo.rpc.service.DemoService; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java index e67133e3f4f..2e7466ddbf1 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/ProviderDemo.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; import org.springframework.context.support.ClassPathXmlApplicationContext; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java index 378dec75e79..c91efdce84b 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java @@ -1,22 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.protocol.rsocket; -import io.rsocket.exceptions.ApplicationErrorException; -import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.extension.ExtensionLoader; -import org.apache.dubbo.remoting.exchange.ExchangeServer; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.ProxyFactory; -import org.apache.dubbo.rpc.RpcException; import org.apache.dubbo.rpc.service.*; import org.junit.AfterClass; -import org.junit.Assert; import org.junit.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -86,7 +96,7 @@ public void testDubboProtocolThrowable() throws Exception { } } - @Test + @Test public void testDubboProtocolMultiService() throws Exception { DemoService service = new DemoServiceImpl(); protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9010/" + DemoService.class.getName()))); diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java index 7b3c2208cce..33f3a2ea8eb 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoException.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.service; public class DemoException extends Exception { diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java index 15dbbe9167b..b2b37b4c1d5 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java @@ -1,13 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.List; import java.util.Map; import java.util.Set; -public interface DemoService { +public interface DemoService { String sayHello(String name); Set keys(Map map); @@ -41,9 +56,10 @@ public interface DemoService { Mono requestMonoBizError(String name); Flux requestFlux(String name); + Flux requestFluxOnError(String name); - Flux requestFluxBizError(String name); + Flux requestFluxBizError(String name); } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java index 2b5a3e4d6af..b67e3e09be4 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.service; import org.apache.dubbo.rpc.RpcContext; @@ -5,8 +21,6 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import java.lang.reflect.Type; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Consumer; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java index 426f8de2365..d3e21dca1ec 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteService.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.service; import java.rmi.RemoteException; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java index fe7257e9f49..afb5788bde5 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/RemoteServiceImpl.java @@ -1,3 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.dubbo.rpc.service; import java.rmi.RemoteException; @@ -5,6 +21,6 @@ public class RemoteServiceImpl implements RemoteService { @Override public String sayHello(String name) throws RemoteException { - return "hello "+name; + return "hello " + name; } } From fbd6700e3d6a71b6bf031f2ae69a00205862ca5e Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Thu, 7 Mar 2019 16:37:56 +0800 Subject: [PATCH 4/8] optimize import --- .../rpc/protocol/rsocket/RSocketInvoker.java | 8 +++++++- .../rpc/protocol/rsocket/RSocketProtocol.java | 15 +++++++++++++-- .../rpc/protocol/rsocket/RSocketProtocolTest.java | 7 ++++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java index 483741d0b17..bc5cf43e285 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -28,7 +28,13 @@ import org.apache.dubbo.common.utils.AtomicPositiveInteger; import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.remoting.transport.CodecSupport; -import org.apache.dubbo.rpc.*; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcContext; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; +import org.apache.dubbo.rpc.RpcResult; import org.apache.dubbo.rpc.protocol.AbstractInvoker; import org.apache.dubbo.rpc.support.RpcUtils; import reactor.core.Exceptions; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java index 1cb36b03ed7..6d480d05856 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java @@ -16,7 +16,12 @@ */ package org.apache.dubbo.rpc.protocol.rsocket; -import io.rsocket.*; +import io.rsocket.AbstractRSocket; +import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -32,7 +37,13 @@ import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.remoting.RemotingException; import org.apache.dubbo.remoting.transport.CodecSupport; -import org.apache.dubbo.rpc.*; +import org.apache.dubbo.rpc.Exporter; +import org.apache.dubbo.rpc.Invocation; +import org.apache.dubbo.rpc.Invoker; +import org.apache.dubbo.rpc.Protocol; +import org.apache.dubbo.rpc.Result; +import org.apache.dubbo.rpc.RpcException; +import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.protocol.AbstractProtocol; import org.apache.dubbo.rpc.support.RpcUtils; import org.reactivestreams.Publisher; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java index c91efdce84b..e34a6f76e13 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java @@ -20,7 +20,12 @@ import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.rpc.Protocol; import org.apache.dubbo.rpc.ProxyFactory; -import org.apache.dubbo.rpc.service.*; +import org.apache.dubbo.rpc.service.DemoException; +import org.apache.dubbo.rpc.service.DemoService; +import org.apache.dubbo.rpc.service.DemoServiceImpl; +import org.apache.dubbo.rpc.service.EchoService; +import org.apache.dubbo.rpc.service.RemoteService; +import org.apache.dubbo.rpc.service.RemoteServiceImpl; import org.junit.AfterClass; import org.junit.Test; import org.reactivestreams.Publisher; From 03d043ac83970778bc17965e8410b4a5d4220d24 Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Fri, 8 Mar 2019 20:09:21 +0800 Subject: [PATCH 5/8] remove author --- .../org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java | 3 --- .../apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java | 3 --- .../apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java | 3 --- .../org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java | 4 +--- .../apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java | 3 --- 5 files changed, 1 insertion(+), 15 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java index ce2e1b52dd6..41762caa331 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/MetadataCodec.java @@ -22,9 +22,6 @@ import java.nio.charset.StandardCharsets; import java.util.Map; -/** - * @author sixie.xyn on 2019/1/3. - */ public class MetadataCodec { public static Map decodeMetadata(byte[] bytes) throws IOException { diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java index e6ad98ae3bc..42e26b9a5ee 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketConstants.java @@ -16,9 +16,6 @@ */ package org.apache.dubbo.rpc.protocol.rsocket; -/** - * @author sixie.xyn on 2019/1/3. - */ public class RSocketConstants { public static final String SERVICE_NAME_KEY = "_service_name"; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java index 074085eaec6..5a4665cf6c5 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketExporter.java @@ -22,9 +22,6 @@ import java.util.Map; -/** - * @author sixie.xyn on 2019/1/2. - */ public class RSocketExporter extends AbstractExporter { private final String key; diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java index bc5cf43e285..f5614dc73e8 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -52,9 +52,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; -/** - * @author sixie.xyn on 2019/1/2. - */ public class RSocketInvoker extends AbstractInvoker { private final RSocket[] clients; @@ -248,4 +245,5 @@ private byte[] encodeData(Invocation invocation) throws IOException { } return dataOutputStream.toByteArray(); } + } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java index 6d480d05856..1ff2487eb7e 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java @@ -66,9 +66,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.function.Function; -/** - * @author sixie.xyn on 2019/1/2. - */ public class RSocketProtocol extends AbstractProtocol { public static final String NAME = "rsocket"; From 111ef46f3db89d2518daa6f7dacae30361937176 Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Wed, 13 Mar 2019 15:42:57 +0800 Subject: [PATCH 6/8] support using Mono/Flux as args --- .../rpc/protocol/rsocket/RSocketInvoker.java | 52 +++-- .../rpc/protocol/rsocket/RSocketProtocol.java | 206 +++++++++++++++++- .../protocol/rsocket/ResourceDirectory.java | 62 ++++++ .../rpc/protocol/rsocket/ResourceInfo.java | 41 ++++ .../protocol/rsocket/RSocketProtocolTest.java | 40 ++++ .../apache/dubbo/rpc/service/DemoService.java | 4 + .../dubbo/rpc/service/DemoServiceImpl.java | 21 ++ 7 files changed, 405 insertions(+), 21 deletions(-) create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceDirectory.java create mode 100644 dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceInfo.java diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java index f5614dc73e8..6863d20237f 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -49,6 +49,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; @@ -93,12 +94,12 @@ protected Result doInvoke(final Invocation invocation) throws Throwable { //TODO support timeout int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); + Class retType = RpcUtils.getReturnType(invocation); + RpcContext.getContext().setFuture(null); //encode inv: metadata and data(arg,attachment) Payload requestPayload = encodeInvocation(invocation); - Class retType = RpcUtils.getReturnType(invocation); - if (retType != null && retType.isAssignableFrom(Mono.class)) { Mono responseMono = currentClient.requestResponse(requestPayload); Mono bizMono = responseMono.map(new Function() { @@ -133,7 +134,9 @@ private Result requestStream(RSocket currentClient, Payload requestPayload) { @Override public Object apply(Payload payload) { - return decodeData(payload); + Object o = decodeData(payload); + payload.release(); + return o; } }); @@ -145,12 +148,12 @@ public Object apply(Payload payload) { private Object decodeData(Payload payload) { try { - //TODO save the copy ByteBuffer dataBuffer = payload.getData(); byte[] dataBytes = new byte[dataBuffer.remaining()]; dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); InputStream dataInputStream = new ByteArrayInputStream(dataBytes); ObjectInput in = serialization.deserialize(null, dataInputStream); + //TODO save the copy int flag = in.readByte(); if ((flag & RSocketConstants.FLAG_ERROR) != 0) { Throwable t = (Throwable) in.readObject(); @@ -209,28 +212,41 @@ public void destroy() { } private Payload encodeInvocation(Invocation invocation) throws IOException { - byte[] metadata = encodeMetadata(invocation); - byte[] data = encodeData(invocation); - return DefaultPayload.create(data, metadata); - } + //process stream args + RpcInvocation inv = (RpcInvocation) invocation; + Class[] parameterTypes = invocation.getParameterTypes(); + Object[] args = inv.getArguments(); + if (args != null) { + for (int i = 0; i < args.length; i++) { + if(args[i]!=null) { + Class argClass = args[i].getClass(); + if (Mono.class.isAssignableFrom(argClass)) { + long id = ResourceDirectory.mountResource(args[i]); + args[i] = new ResourceInfo(id, ResourceInfo.RESOURCE_TYPE_MONO); + parameterTypes[i] = ResourceInfo.class; + } else if (Flux.class.isAssignableFrom(argClass)) { + long id = ResourceDirectory.mountResource(args[i]); + args[i] = new ResourceInfo(id, ResourceInfo.RESOURCE_TYPE_FLUX); + parameterTypes[i] = ResourceInfo.class; + } + } + } + } - private byte[] encodeMetadata(Invocation invocation) throws IOException { + //metadata Map metadataMap = new HashMap(); metadataMap.put(RSocketConstants.SERVICE_NAME_KEY, invocation.getAttachment(Constants.PATH_KEY)); metadataMap.put(RSocketConstants.SERVICE_VERSION_KEY, invocation.getAttachment(Constants.VERSION_KEY)); metadataMap.put(RSocketConstants.METHOD_NAME_KEY, invocation.getMethodName()); - metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(invocation.getParameterTypes())); metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, (Byte) serialization.getContentTypeId()); - return MetadataCodec.encodeMetadata(metadataMap); - } + metadataMap.put(RSocketConstants.PARAM_TYPE_KEY, ReflectUtils.getDesc(parameterTypes)); + byte[] metadata = MetadataCodec.encodeMetadata(metadataMap); - private byte[] encodeData(Invocation invocation) throws IOException { + //data ByteArrayOutputStream dataOutputStream = new ByteArrayOutputStream(); Serialization serialization = CodecSupport.getSerialization(getUrl()); ObjectOutput out = serialization.serialize(getUrl(), dataOutputStream); - RpcInvocation inv = (RpcInvocation) invocation; - Object[] args = inv.getArguments(); if (args != null) { for (int i = 0; i < args.length; i++) { out.writeObject(args[i]); @@ -243,7 +259,9 @@ private byte[] encodeData(Invocation invocation) throws IOException { if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } - return dataOutputStream.toByteArray(); - } + byte[] data = dataOutputStream.toByteArray(); + + return DefaultPayload.create(data, metadata); + } } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java index 1ff2487eb7e..be537eb48ba 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocol.java @@ -33,6 +33,7 @@ import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.serialize.ObjectInput; import org.apache.dubbo.common.serialize.ObjectOutput; +import org.apache.dubbo.common.serialize.Serialization; import org.apache.dubbo.common.utils.NetUtils; import org.apache.dubbo.common.utils.ReflectUtils; import org.apache.dubbo.remoting.RemotingException; @@ -61,6 +62,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -231,15 +233,128 @@ private RSocket initClient(URL url) { rSocket -> new AbstractRSocket() { public Mono requestResponse(Payload payload) { - //TODO support Mono arg - throw new UnsupportedOperationException(); + try { + ByteBuffer metadataBuffer = payload.getMetadata(); + byte[] metadataBytes = new byte[metadataBuffer.remaining()]; + metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining()); + Map metadataMap = MetadataCodec.decodeMetadata(metadataBytes); + Byte serializeId = ((Integer) metadataMap.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue(); + + + ByteBuffer dataBuffer = payload.getData(); + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); + InputStream dataInputStream = new ByteArrayInputStream(dataBytes); + ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream); + long id = in.readLong(); + + Mono mono = ResourceDirectory.unmountMono(id); + return mono.map(new Function() { + @Override + public Payload apply(Object o) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) 0); + out.writeObject(o); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload responsePayload = DefaultPayload.create(bos.toByteArray()); + return responsePayload; + } catch (Throwable t) { + throw Exceptions.propagate(t); + } + } + }).onErrorResume(new Function>() { + @Override + public Publisher apply(Throwable throwable) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) RSocketConstants.FLAG_ERROR); + out.writeObject(throwable); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload errorPayload = DefaultPayload.create(bos.toByteArray()); + return Flux.just(errorPayload); + } catch (Throwable t) { + throw Exceptions.propagate(t); + } + } + }); + + }catch (Throwable t){ + throw new RuntimeException(t); + } } @Override public Flux requestStream(Payload payload) { - //TODO support Flux arg - throw new UnsupportedOperationException(); + try { + ByteBuffer metadataBuffer = payload.getMetadata(); + byte[] metadataBytes = new byte[metadataBuffer.remaining()]; + metadataBuffer.get(metadataBytes, metadataBuffer.position(), metadataBuffer.remaining()); + Map metadataMap = MetadataCodec.decodeMetadata(metadataBytes); + Byte serializeId = ((Integer) metadataMap.get(RSocketConstants.SERIALIZE_TYPE_KEY)).byteValue(); + + + ByteBuffer dataBuffer = payload.getData(); + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); + InputStream dataInputStream = new ByteArrayInputStream(dataBytes); + ObjectInput in = CodecSupport.getSerializationById(serializeId).deserialize(null, dataInputStream); + long id = in.readLong(); + + Flux flux = ResourceDirectory.unmountFlux(id); + return flux.map(new Function() { + @Override + public Payload apply(Object o) { + try { + return encodeData(o, serializeId); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }).onErrorResume(new Function>() { + @Override + public Publisher apply(Throwable throwable) { + try { + Payload errorPayload = encodeError(throwable, serializeId); + return Flux.just(errorPayload); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + }); + }catch (Throwable t){ + throw new RuntimeException(t); + } } + + private Payload encodeData(Object data, byte serializeId) throws Throwable { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) 0); + out.writeObject(data); + out.flushBuffer(); + bos.flush(); + bos.close(); + return DefaultPayload.create(bos.toByteArray()); + } + + private Payload encodeError(Throwable throwable, byte serializeId) throws Throwable { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeByte((byte) RSocketConstants.FLAG_ERROR); + out.writeObject(throwable); + out.flushBuffer(); + bos.flush(); + bos.close(); + return DefaultPayload.create(bos.toByteArray()); + } + }) .transport(TcpClientTransport.create(serverAddress)) .start() @@ -516,6 +631,21 @@ private Invocation decodeInvocation(Payload payload, Map metadat } } } + + //process stream args + for (int i = 0; i < pts.length; i++) { + if (ResourceInfo.class.isAssignableFrom(pts[i])) { + ResourceInfo resourceInfo = (ResourceInfo) args[i]; + if (resourceInfo.getType() == ResourceInfo.RESOURCE_TYPE_MONO) { + pts[i] = Mono.class; + args[i] = getMonoProxy(resourceInfo.getId(), serializeId, reactiveSocket); + } else { + pts[i] = Flux.class; + args[i] = getFluxProxy(resourceInfo.getId(), serializeId, reactiveSocket); + } + } + } + inv.setParameterTypes(pts); inv.setArguments(args); Map map = (Map) in.readObject(Map.class); @@ -526,5 +656,73 @@ private Invocation decodeInvocation(Payload payload, Map metadat } }); } + + private Mono getMonoProxy(long id, Byte serializeId, RSocket rSocket) throws IOException { + Map metadataMap = new HashMap(); + metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, serializeId); + byte[] metadata = MetadataCodec.encodeMetadata(metadataMap); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeLong(id); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload payload = DefaultPayload.create(bos.toByteArray(), metadata); + + Mono payloads = rSocket.requestResponse(payload); + Mono streamArg = payloads.map(new Function() { + @Override + public Object apply(Payload payload) { + return decodeData(serializeId, payload); + } + }); + return streamArg; + } + + private Flux getFluxProxy(long id, Byte serializeId, RSocket rSocket) throws IOException { + Map metadataMap = new HashMap(); + metadataMap.put(RSocketConstants.SERIALIZE_TYPE_KEY, serializeId); + byte[] metadata = MetadataCodec.encodeMetadata(metadataMap); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutput out = CodecSupport.getSerializationById(serializeId).serialize(null, bos); + out.writeLong(id); + out.flushBuffer(); + bos.flush(); + bos.close(); + Payload payload = DefaultPayload.create(bos.toByteArray(), metadata); + + Flux payloads = rSocket.requestStream(payload); + Flux streamArg = payloads.map(new Function() { + @Override + public Object apply(Payload payload) { + return decodeData(serializeId, payload); + } + }); + return streamArg; + } + + private Object decodeData(Byte serializeId, Payload payload) { + try { + Serialization serialization = CodecSupport.getSerializationById(serializeId); + //TODO save the copy + ByteBuffer dataBuffer = payload.getData(); + byte[] dataBytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(dataBytes, dataBuffer.position(), dataBuffer.remaining()); + InputStream dataInputStream = new ByteArrayInputStream(dataBytes); + ObjectInput in = serialization.deserialize(null, dataInputStream); + int flag = in.readByte(); + if ((flag & RSocketConstants.FLAG_ERROR) != 0) { + Throwable t = (Throwable) in.readObject(); + throw t; + } else { + return in.readObject(); + } + } catch (Throwable t) { + throw Exceptions.propagate(t); + } + } + } } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceDirectory.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceDirectory.java new file mode 100644 index 00000000000..c1b66ac88db --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceDirectory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.rsocket; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class ResourceDirectory { + + private static AtomicLong idGen = new AtomicLong(1); + + private static ConcurrentHashMap id2ResourceMap = new ConcurrentHashMap(); + + + public static long mountResource(Object resource) { + long id = idGen.getAndIncrement(); + id2ResourceMap.put(id, resource); + return id; + } + + public static Object unmountResource(long id) { + return id2ResourceMap.get(id); + } + + public static long mountMono(Mono mono) { + long id = idGen.getAndIncrement(); + id2ResourceMap.put(id, mono); + return id; + } + + public static long mountFlux(Flux flux) { + long id = idGen.getAndIncrement(); + id2ResourceMap.put(id, flux); + return id; + } + + public static Mono unmountMono(long id) { + return (Mono) id2ResourceMap.get(id); + } + + public static Flux unmountFlux(long id) { + return (Flux) id2ResourceMap.get(id); + } + +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceInfo.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceInfo.java new file mode 100644 index 00000000000..1c1275bd38a --- /dev/null +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/ResourceInfo.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.rpc.protocol.rsocket; + +import java.io.Serializable; + +public class ResourceInfo implements Serializable { + + public static final byte RESOURCE_TYPE_MONO = 1; + public static final byte RESOURCE_TYPE_FLUX = 2; + + private final long id; + private final byte type; + + public ResourceInfo(long id, byte type) { + this.id = id; + this.type = type; + } + + public long getId() { + return id; + } + + public byte getType() { + return type; + } +} diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java index e34a6f76e13..d9733eb4676 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketProtocolTest.java @@ -117,6 +117,7 @@ public void testDubboProtocolMultiService() throws Exception { assertEquals("world", service.echo("world")); assertEquals("hello world", remote.sayHello("world")); + EchoService serviceEcho = (EchoService) service; assertEquals(serviceEcho.$echo("test"), "test"); @@ -216,4 +217,43 @@ public void accept(String s) { } } + + @Test + public void testRequestMonoWithMonoArg() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + + Mono result = service.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B")); + result.doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(s, "A B"); + System.out.println(s); + } + }).block(); + } + + + @Test + public void testRequestFluxWithFluxArg() throws Exception { + DemoService service = new DemoServiceImpl(); + protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()))); + service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("rsocket://127.0.0.1:9020/" + DemoService.class.getName()).addParameter("timeout", 3000l))); + + { + Flux result = service.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3")); + result.doOnNext(new Consumer() { + @Override + public void accept(String s) { + System.out.println(s); + } + }).takeLast(1).doOnNext(new Consumer() { + @Override + public void accept(String s) { + assertEquals(s, "C 3"); + } + }).blockLast(); + } + } } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java index b2b37b4c1d5..8f7a3ddc5ad 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoService.java @@ -61,5 +61,9 @@ public interface DemoService { Flux requestFluxBizError(String name); + Mono requestMonoWithMonoArg(Mono m1, Mono m2); + + Flux requestFluxWithFluxArg(Flux f1, Flux f2); + } diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java index b67e3e09be4..1ba6d39a7b0 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/test/java/org/apache/dubbo/rpc/service/DemoServiceImpl.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Consumer; public class DemoServiceImpl implements DemoService { @@ -149,5 +150,25 @@ public void accept(FluxSink fluxSink) { }); } + @Override + public Mono requestMonoWithMonoArg(Mono m1, Mono m2) { + return m1.zipWith(m2, new BiFunction() { + @Override + public String apply(String s, String s2) { + return s+" "+s2; + } + }); + } + + @Override + public Flux requestFluxWithFluxArg(Flux f1, Flux f2) { + return f1.zipWith(f2, new BiFunction() { + @Override + public String apply(String s, String s2) { + return s+" "+s2; + } + }); + } + } From f0a56244c1da4ac5b693d2513f61b22e76c2a09a Mon Sep 17 00:00:00 2001 From: "ken.lj" Date: Wed, 20 Mar 2019 10:23:53 +0800 Subject: [PATCH 7/8] remove unused import --- .../apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java index 6863d20237f..572f429c0a6 100644 --- a/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java +++ b/dubbo-rpc/dubbo-rpc-rsocket/src/main/java/org/apache/dubbo/rpc/protocol/rsocket/RSocketInvoker.java @@ -16,9 +16,6 @@ */ package org.apache.dubbo.rpc.protocol.rsocket; -import io.rsocket.Payload; -import io.rsocket.RSocket; -import io.rsocket.util.DefaultPayload; import org.apache.dubbo.common.Constants; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.serialize.Cleanable; @@ -37,6 +34,10 @@ import org.apache.dubbo.rpc.RpcResult; import org.apache.dubbo.rpc.protocol.AbstractInvoker; import org.apache.dubbo.rpc.support.RpcUtils; + +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.util.DefaultPayload; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -49,7 +50,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; From 105b94f2c8f9c0096e108b3193b83455af3a6ebd Mon Sep 17 00:00:00 2001 From: "sixie.xyn" Date: Tue, 26 Mar 2019 14:11:00 +0800 Subject: [PATCH 8/8] add rsocket module --- dubbo-bom/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/dubbo-bom/pom.xml b/dubbo-bom/pom.xml index dd87f685970..bafea893ca6 100644 --- a/dubbo-bom/pom.xml +++ b/dubbo-bom/pom.xml @@ -193,6 +193,11 @@ dubbo-rpc-rest ${project.version} + + org.apache.dubbo + dubbo-rpc-rsocket + ${project.version} + org.apache.dubbo dubbo-registry-api