Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
panxiaojun233 committed Dec 14, 2020
1 parent fd7597a commit 7748844
Show file tree
Hide file tree
Showing 31 changed files with 2,000 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting;

import java.io.IOException;

import org.apache.dubbo.common.extension.Adaptive;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;

@SPI
public interface GrpcCodec {

@Adaptive({Constants.CODEC_KEY})
void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

@Adaptive({Constants.CODEC_KEY})
Object decode(GrpcPacket packet) throws IOException;

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.dubbo.remoting;

import java.io.InputStream;
import java.util.Map;

/**
* convert Object from/to InputStream
*/

public interface GrpcPacket {

Map<String, String> getMetadata();

InputStream getInputStream();
}
44 changes: 44 additions & 0 deletions dubbo-remoting/dubbo-remoting-netty4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
</properties>

<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.22.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-api</artifactId>
Expand All @@ -51,4 +61,38 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<runOrder>alphabetical</runOrder>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.14.0:exe:${os.detected.classifier}
</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.dubbo.remoting.transport.netty4;

import io.netty.handler.codec.MessageToMessageCodec;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.remoting.Codec2;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
Expand All @@ -26,9 +27,17 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.transport.netty4.http2.StreamPayload;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceRepository;

import java.io.IOException;
import java.lang.reflect.Parameter;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* NettyCodecAdapter.
Expand All @@ -39,6 +48,8 @@ final public class NettyCodecAdapter {

private final ChannelHandler decoder = new InternalDecoder();

private final ChannelHandler http2Codec = new InternalDecoder();

private final Codec2 codec;

private final URL url;
Expand Down Expand Up @@ -82,6 +93,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out
// decode object.
do {
int saveReaderIndex = message.readerIndex();
// message to request
Object msg = codec.decode(channel, message);
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
message.readerIndex(saveReaderIndex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package org.apache.dubbo.remoting.transport.netty4.grpc;

import io.netty.util.AsciiString;

public class GrpcElf {
public static final AsciiString GRPC_STATUS = AsciiString.cached("grpc-status");
public static final AsciiString GRPC_MESSAGE = AsciiString.cached("grpc-message");
public static final AsciiString GRPC_ENCODING = AsciiString.cached("grpc-encoding");
public static final AsciiString GRPC_TIMEOUT = AsciiString.cached("grpc-timeout");
public static final AsciiString GRPC_ACCEPT_ENCODING = AsciiString.cached("grpc-accept-encoding");

public static final AsciiString APPLICATION_GRPC = AsciiString.cached("application/grpc");
public static final AsciiString TEXT_MIME = AsciiString.cached("text/plain; encoding=utf-8");
public static final AsciiString GRPC_JSON = AsciiString.cached("application/grpc+json");
public static final AsciiString GRPC_PROTO = AsciiString.cached("application/grpc+proto");


/**
* Indicates whether or not the given value is a valid gRPC content-type.
*/
public static boolean isGrpcContentType(CharSequence contentType) {
if (contentType == null) {
return false;
}

if (APPLICATION_GRPC.length() > contentType.length()) {
return false;
}

if (APPLICATION_GRPC.contentEquals(contentType)) {
return true;
}
if (!AsciiString.of(contentType).startsWith(APPLICATION_GRPC)) {
// Not a gRPC content-type.
return false;
}

if (contentType.length() == APPLICATION_GRPC.length()) {
// The strings match exactly.
return true;
}

// The contentType matches, but is longer than the expected string.
// We need to support variations on the content-type (e.g. +proto, +json) as defined by the
// gRPC wire spec.
char nextChar = contentType.charAt(APPLICATION_GRPC.length());
return nextChar == '+' || nextChar == ';';
}

/**
* All error codes identified by the HTTP/2 spec. Used in GOAWAY and RST_STREAM frames.
*/
//public enum Http2Error {
// /**
// * Servers implementing a graceful shutdown of the connection will send {@code GOAWAY} with
// * {@code NO_ERROR}. In this case it is important to indicate to the application that the
// * request should be retried (i.e. {@link Status#UNAVAILABLE}).
// */
// NO_ERROR(0x0, Status.UNAVAILABLE),
// PROTOCOL_ERROR(0x1, Status.INTERNAL),
// INTERNAL_ERROR(0x2, Status.INTERNAL),
// FLOW_CONTROL_ERROR(0x3, Status.INTERNAL),
// SETTINGS_TIMEOUT(0x4, Status.INTERNAL),
// STREAM_CLOSED(0x5, Status.INTERNAL),
// FRAME_SIZE_ERROR(0x6, Status.INTERNAL),
// REFUSED_STREAM(0x7, Status.UNAVAILABLE),
// CANCEL(0x8, Status.CANCELLED),
// COMPRESSION_ERROR(0x9, Status.INTERNAL),
// CONNECT_ERROR(0xA, Status.INTERNAL),
// ENHANCE_YOUR_CALM(0xB, Status.RESOURCE_EXHAUSTED.withDescription("Bandwidth exhausted")),
// INADEQUATE_SECURITY(0xC, Status.PERMISSION_DENIED.withDescription("Permission denied as "
// + "protocol is not secure enough to call")),
// HTTP_1_1_REQUIRED(0xD, Status.UNKNOWN);
//
// // Populate a mapping of code to enum value for quick look-up.
// private static final Http2Error[] codeMap = buildHttp2CodeMap();
// private final int code;
// // Status is not guaranteed to be deeply immutable. Don't care though, since that's only true
// // when there are exceptions in the Status, which is not true here.
// @SuppressWarnings("ImmutableEnumChecker")
// private final Status status;
//
// Http2Error(int code, Status status) {
// this.code = code;
// this.status = status.augmentDescription("HTTP/2 error code: " + this.name());
// }
//
// private static Http2Error[] buildHttp2CodeMap() {
// Http2Error[] errors = Http2Error.values();
// int size = (int) errors[errors.length - 1].code() + 1;
// Http2Error[] http2CodeMap = new Http2Error[size];
// for (Http2Error error : errors) {
// int index = (int) error.code();
// http2CodeMap[index] = error;
// }
// return http2CodeMap;
// }
//
// /**
// * Looks up the HTTP/2 error code enum value for the specified code.
// *
// * @param code an HTTP/2 error code value.
// * @return the HTTP/2 error code enum or {@code null} if not found.
// */
// public static Http2Error forCode(long code) {
// if (code >= codeMap.length || code < 0) {
// return null;
// }
// return codeMap[(int) code];
// }
//
// /**
// * Looks up the {@link Status} from the given HTTP/2 error code. This is preferred over {@code
// * forCode(code).status()}, to more easily conform to HTTP/2:
// *
// * <blockquote>Unknown or unsupported error codes MUST NOT trigger any special behavior.
// * These MAY be treated by an implementation as being equivalent to INTERNAL_ERROR.</blockquote>
// *
// * @param code the HTTP/2 error code.
// * @return a {@link Status} representing the given error.
// */
// public static Status statusForCode(long code) {
// Http2Error error = forCode(code);
// if (error == null) {
// // This "forgets" the message of INTERNAL_ERROR while keeping the same status code.
// Status.Code statusCode = INTERNAL_ERROR.status().getCode();
// return Status.fromCodeValue(statusCode.value())
// .withDescription("Unrecognized HTTP/2 error code: " + code);
// }
//
// return error.status();
// }
//
// /**
// * Gets the code for this error used on the wire.
// */
// public long code() {
// return code;
// }
//
// /**
// * Gets the {@link Status} associated with this HTTP/2 code.
// */
// public Status status() {
// return status;
// }
//}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.apache.dubbo.remoting.transport.netty4.grpc;

import java.lang.reflect.InvocationTargetException;

import com.google.protobuf.Message;
import com.google.protobuf.Parser;
//import io.netty.buffer.ByteBuf;
//import io.netty.buffer.ByteBufInputStream;

/**
* guohaoice@gmail.com
*/
public class ProtoUtil {
/**
* parse proto from netty {@link ByteBuf}
*
* @param input netty ByteBuf
* @param defaultInstance default instance for proto
* @return proto message
*/
@SuppressWarnings("all")
//public static <T extends MessageLite> T parseFrom(ByteBuf input, int len, T defaultInstance) throws InvalidProtocolBufferException {
// final ByteBuf byteBuf = input.readSlice(len);
// T message = (T) defaultInstance.getParserForType().parseFrom(new ByteBufInputStream(byteBuf));
// return message;
//}

public static Message defaultInst(Class<?> clz) {
Message defaultInst;
try {
defaultInst = (Message) clz.getMethod("getDefaultInstance").invoke(null);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException("Create default protobuf instance failed ", e);
}
return defaultInst;
}

@SuppressWarnings("all")
public static <T> Parser<T> getParser(Class<T> clz) {
Message defaultInst = defaultInst(clz);
return (Parser<T>) defaultInst.getParserForType();
}
}
Loading

0 comments on commit 7748844

Please sign in to comment.