Skip to content

Commit

Permalink
Merge pull request #582 from relaxedCat/feat-grpc-json
Browse files Browse the repository at this point in the history
feat: support grpc json protocol
  • Loading branch information
AlexStocks authored Jun 14, 2020
2 parents 9dee25c + 45a983e commit b9f71f7
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 7 deletions.
59 changes: 55 additions & 4 deletions protocol/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,64 @@ import (
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"
)

import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
)

// Client ...
var (
clientConf *ClientConfig
)

func init() {
// load clientconfig from consumer_config
consumerConfig := config.GetConsumerConfig()

clientConfig := GetClientConfig()
clientConf = &clientConfig

// check client config and decide whether to use the default config
defer func() {
if clientConf == nil || len(clientConf.ContentSubType) == 0 {
defaultClientConfig := GetDefaultClientConfig()
clientConf = &defaultClientConfig
}
if err := clientConf.Validate(); err != nil {
panic(err)
}
}()

if consumerConfig.ApplicationConfig == nil {
return
}
protocolConf := config.GetConsumerConfig().ProtocolConf

if protocolConf == nil {
logger.Info("protocol_conf default use dubbo config")
} else {
grpcConf := protocolConf.(map[interface{}]interface{})[GRPC]
if grpcConf == nil {
logger.Warnf("grpcConf is nil")
return
}
grpcConfByte, err := yaml.Marshal(grpcConf)
if err != nil {
panic(err)
}
err = yaml.Unmarshal(grpcConfByte, clientConf)
if err != nil {
panic(err)
}
}

}

// Client return grpc connection and warp service stub
type Client struct {
*grpc.ClientConn
invoker reflect.Value
Expand All @@ -43,9 +92,11 @@ type Client struct {
func NewClient(url common.URL) *Client {
// if global trace instance was set , it means trace function enabled. If not , will return Nooptracer
tracer := opentracing.GlobalTracer()
conn, err := grpc.Dial(url.Location, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())))
dailOpts := make([]grpc.DialOption, 0, 4)
dailOpts = append(dailOpts, grpc.WithInsecure(), grpc.WithBlock(), grpc.WithUnaryInterceptor(
otgrpc.OpenTracingClientInterceptor(tracer, otgrpc.LogPayloads())),
grpc.WithDefaultCallOptions(grpc.CallContentSubtype(clientConf.ContentSubType)))
conn, err := grpc.Dial(url.Location, dailOpts...)
if err != nil {
panic(err)
}
Expand Down
76 changes: 76 additions & 0 deletions protocol/grpc/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package grpc

import (
"bytes"
"encoding/json"
)

import (
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/encoding"
)

const (
codecJson = "json"
codecProto = "proto"
)

func init() {
encoding.RegisterCodec(grpcJson{
Marshaler: jsonpb.Marshaler{
EmitDefaults: true,
OrigName: true,
},
})
}

type grpcJson struct {
jsonpb.Marshaler
jsonpb.Unmarshaler
}

// Name implements grpc encoding package Codec interface method,
// returns the name of the Codec implementation.
func (_ grpcJson) Name() string {
return codecJson
}

// Marshal implements grpc encoding package Codec interface method,returns the wire format of v.
func (j grpcJson) Marshal(v interface{}) (out []byte, err error) {
if pm, ok := v.(proto.Message); ok {
b := new(bytes.Buffer)
err := j.Marshaler.Marshal(b, pm)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
return json.Marshal(v)
}

// Unmarshal implements grpc encoding package Codec interface method,Unmarshal parses the wire format into v.
func (j grpcJson) Unmarshal(data []byte, v interface{}) (err error) {
if pm, ok := v.(proto.Message); ok {
b := bytes.NewBuffer(data)
return j.Unmarshaler.Unmarshal(b, pm)
}
return json.Unmarshal(data, v)
}
65 changes: 65 additions & 0 deletions protocol/grpc/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package grpc

import (
perrors "github.com/pkg/errors"
)

type (
// ServerConfig currently is empty struct,for future expansion
ServerConfig struct {
}

// ClientConfig wrap client call parameters
ClientConfig struct {
// content type, more information refer by https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
ContentSubType string `default:"proto" yaml:"content_sub_type" json:"content_sub_type,omitempty"`
}
)

// GetDefaultClientConfig return grpc client default call options
func GetDefaultClientConfig() ClientConfig {
return ClientConfig{
ContentSubType: codecProto,
}
}

// GetDefaultServerConfig currently return empty struct,for future expansion
func GetDefaultServerConfig() ServerConfig {
return ServerConfig{}
}

// GetClientConfig return grpc client custom call options
func GetClientConfig() ClientConfig {
return ClientConfig{}
}

// Validate check if custom config encoding is supported in dubbo grpc
func (c *ClientConfig) Validate() error {
if c.ContentSubType != codecJson && c.ContentSubType != codecProto {
return perrors.Errorf(" dubbo-go grpc codec currently only support proto、json, %s isn't supported,"+
" please check protocol content_sub_type config", c.ContentSubType)
}
return nil
}

// Validate currently return empty struct,for future expansion
func (c *ServerConfig) Validate() error {
return nil
}
5 changes: 2 additions & 3 deletions protocol/grpc/protoc-gen-dubbo/plugin/dubbo/dubbo.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func (g *dubboGrpc) GenerateImports(file *generator.FileDescriptor) {
g.P(`dgrpc "github.com/apache/dubbo-go/protocol/grpc"`)
g.P(`"github.com/apache/dubbo-go/protocol/invocation"`)
g.P(`"github.com/apache/dubbo-go/protocol"`)
g.P(`"github.com/apache/dubbo-go/config"`)
g.P(` ) `)
}

Expand Down Expand Up @@ -266,7 +265,7 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method *
g.P(`invo := invocation.NewRPCInvocation("`, methName, `", args, nil)`)

g.P("if interceptor == nil {")
g.P("result := base.GetProxyImpl().Invoke(invo)")
g.P("result := base.GetProxyImpl().Invoke(context.Background(), invo)")
g.P("return result.Result(), result.Error()")
g.P("}")

Expand All @@ -276,7 +275,7 @@ func (g *dubboGrpc) generateServerMethod(servName, fullServName string, method *
g.P("}")

g.P("handler := func(ctx ", contextPkg, ".Context, req interface{}) (interface{}, error) {")
g.P("result := base.GetProxyImpl().Invoke(invo)")
g.P("result := base.GetProxyImpl().Invoke(context.Background(), invo)")
g.P("return result.Result(), result.Error()")
g.P("}")

Expand Down

0 comments on commit b9f71f7

Please sign in to comment.