-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade grpc-gateway from v1 to v2 #16454
Conversation
26eadf9
to
33b2981
Compare
That's what I mentioned. It's API breaking change. |
1d31ada
to
49610d9
Compare
Looks very promising. Will continue to work on the workflow failures tomorrow. Manually verified that all restful APIs work. |
It seems that the grpc-gateway/v2 runtime can't correctly marshal/unmarshal the MarshalFor example, The etcd/tests/e2e/v3_curl_test.go Lines 151 to 172 in 5a54fe6
It's supposed to be marshalled to the following string (the behaviour of the
But it's actually marshalled to the following string (the behaviour of this PR),
Unmarshal
|
For the oneOf, yes, the new API is kind of pain. The protobuf-go won't generate json tag for oneOf filed https://github.com/protocolbuffers/protobuf-go/blob/fc47fdd3d3fca5283fa9428ac94cf730236e4ca3/cmd/protoc-gen-go/internal_gengo/main.go#L814. We have
|
Based on your commit, I think we should maintain our own jsonpb marshaler implementation because we switch protoV1.Message to protoV2.Message. We need to switch it back to protoV1 and use old golang proto marshal/unmarshal to get the json data correctly. For the streaming API call, the marshaler will get Based on grpc-gateway v1.16 jsonpbdiff --git a/pkg/legacygwjsonpb/marshal.go b/pkg/legacygwjsonpb/marshal.go
new file mode 100644
index 000000000..eb9072ef9
--- /dev/null
+++ b/pkg/legacygwjsonpb/marshal.go
@@ -0,0 +1,292 @@
+package legacygwjsonpb
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "reflect"
+
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/golang/protobuf/proto"
+ gw "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
+ protoV2 "google.golang.org/protobuf/proto"
+)
+
+// JSONPb is a Marshaler which marshals/unmarshals into/from JSON
+// with the "github.com/golang/protobuf/jsonpb".
+// It supports fully functionality of protobuf unlike JSONBuiltin.
+//
+// The NewDecoder method returns a DecoderWrapper, so the underlying
+// *json.Decoder methods can be used.
+type JSONPb jsonpb.Marshaler
+
+// ContentType always returns "application/json".
+func (*JSONPb) ContentType(v interface{}) string {
+ return "application/json"
+}
+
+// Marshal marshals "v" into JSON.
+func (j *JSONPb) Marshal(vv interface{}) (ret []byte, retErr error) {
+ var v interface{} = vv
+
+ // For unary api, the gateway always convert the messageV1 into V2.
+ // We should convert it back. And for the streaming api, the input is
+ // kind of map, we can't just call the proto.MessageV1 because it
+ // will panic :)
+ //
+ // REF: github.com/grpc-ecosystem/grpc-gateway/v2@v2.16.2/runtime/handler.goL75
+ if _, ok := vv.(protoV2.Message); ok {
+ v = proto.MessageV1(vv)
+ }
+
+ if _, ok := v.(proto.Message); !ok {
+ return j.marshalNonProtoField(v)
+ }
+
+ var buf bytes.Buffer
+ if err := j.marshalTo(&buf, v); err != nil {
+ return nil, err
+ }
+ return buf.Bytes(), nil
+}
+
+func (j *JSONPb) marshalTo(w io.Writer, v interface{}) error {
+ p, ok := v.(proto.Message)
+ if !ok {
+ buf, err := j.marshalNonProtoField(v)
+ if err != nil {
+ return err
+ }
+ _, err = w.Write(buf)
+ return err
+ }
+ return (*jsonpb.Marshaler)(j).Marshal(w, p)
+}
+
+var (
+ // protoMessageType is stored to prevent constant lookup of the same type at runtime.
+ protoMessageType = reflect.TypeOf((*proto.Message)(nil)).Elem()
+)
+
+// marshalNonProto marshals a non-message field of a protobuf message.
+// This function does not correctly marshals arbitrary data structure into JSON,
+// but it is only capable of marshaling non-message field values of protobuf,
+// i.e. primitive types, enums; pointers to primitives or enums; maps from
+// integer/string types to primitives/enums/pointers to messages.
+func (j *JSONPb) marshalNonProtoField(v interface{}) ([]byte, error) {
+ if v == nil {
+ return []byte("null"), nil
+ }
+ rv := reflect.ValueOf(v)
+ for rv.Kind() == reflect.Ptr {
+ if rv.IsNil() {
+ return []byte("null"), nil
+ }
+ rv = rv.Elem()
+ }
+
+ if rv.Kind() == reflect.Slice {
+ if rv.IsNil() {
+ if j.EmitDefaults {
+ return []byte("[]"), nil
+ }
+ return []byte("null"), nil
+ }
+
+ if rv.Type().Elem().Implements(protoMessageType) {
+ var buf bytes.Buffer
+ err := buf.WriteByte('[')
+ if err != nil {
+ return nil, err
+ }
+ for i := 0; i < rv.Len(); i++ {
+ if i != 0 {
+ err = buf.WriteByte(',')
+ if err != nil {
+ return nil, err
+ }
+ }
+ if err = (*jsonpb.Marshaler)(j).Marshal(&buf, rv.Index(i).Interface().(proto.Message)); err != nil {
+ return nil, err
+ }
+ }
+ err = buf.WriteByte(']')
+ if err != nil {
+ return nil, err
+ }
+
+ return buf.Bytes(), nil
+ }
+ }
+
+ if rv.Kind() == reflect.Map {
+ m := make(map[string]*json.RawMessage)
+ for _, k := range rv.MapKeys() {
+ buf, err := j.Marshal(rv.MapIndex(k).Interface())
+ if err != nil {
+ return nil, err
+ }
+ m[fmt.Sprintf("%v", k.Interface())] = (*json.RawMessage)(&buf)
+ }
+ if j.Indent != "" {
+ return json.MarshalIndent(m, "", j.Indent)
+ }
+ return json.Marshal(m)
+ }
+ if enum, ok := rv.Interface().(protoEnum); ok && !j.EnumsAsInts {
+ return json.Marshal(enum.String())
+ }
+ return json.Marshal(rv.Interface())
+}
+
+// Unmarshal unmarshals JSON "data" into "v"
+func (j *JSONPb) Unmarshal(data []byte, v interface{}) error {
+ return unmarshalJSONPb(data, v)
+}
+
+// NewDecoder returns a Decoder which reads JSON stream from "r".
+func (j *JSONPb) NewDecoder(r io.Reader) gw.Decoder {
+ d := json.NewDecoder(r)
+ return DecoderWrapper{Decoder: d}
+}
+
+// DecoderWrapper is a wrapper around a *json.Decoder that adds
+// support for protos to the Decode method.
+type DecoderWrapper struct {
+ *json.Decoder
+}
+
+// Decode wraps the embedded decoder's Decode method to support
+// protos using a jsonpb.Unmarshaler.
+func (d DecoderWrapper) Decode(v interface{}) error {
+ return decodeJSONPb(d.Decoder, v)
+}
+
+// NewEncoder returns an Encoder which writes JSON stream into "w".
+func (j *JSONPb) NewEncoder(w io.Writer) gw.Encoder {
+ return gw.EncoderFunc(func(vv interface{}) error {
+ v := proto.MessageV1(vv)
+
+ if err := j.marshalTo(w, v); err != nil {
+ return err
+ }
+ // mimic json.Encoder by adding a newline (makes output
+ // easier to read when it contains multiple encoded items)
+ _, err := w.Write(j.Delimiter())
+ return err
+ })
+}
+
+func unmarshalJSONPb(data []byte, v interface{}) error {
+ d := json.NewDecoder(bytes.NewReader(data))
+ return decodeJSONPb(d, v)
+}
+
+func decodeJSONPb(d *json.Decoder, v interface{}) error {
+ p, ok := v.(proto.Message)
+ if !ok {
+ return decodeNonProtoField(d, v)
+ }
+ unmarshaler := &jsonpb.Unmarshaler{AllowUnknownFields: allowUnknownFields}
+ return unmarshaler.UnmarshalNext(d, p)
+}
+
+func decodeNonProtoField(d *json.Decoder, v interface{}) error {
+ rv := reflect.ValueOf(v)
+ if rv.Kind() != reflect.Ptr {
+ return fmt.Errorf("%T is not a pointer", v)
+ }
+ for rv.Kind() == reflect.Ptr {
+ if rv.IsNil() {
+ rv.Set(reflect.New(rv.Type().Elem()))
+ }
+ if rv.Type().ConvertibleTo(typeProtoMessage) {
+ unmarshaler := &jsonpb.Unmarshaler{AllowUnknownFields: allowUnknownFields}
+ return unmarshaler.UnmarshalNext(d, rv.Interface().(proto.Message))
+ }
+ rv = rv.Elem()
+ }
+ if rv.Kind() == reflect.Map {
+ if rv.IsNil() {
+ rv.Set(reflect.MakeMap(rv.Type()))
+ }
+ conv, ok := convFromType[rv.Type().Key().Kind()]
+ if !ok {
+ return fmt.Errorf("unsupported type of map field key: %v", rv.Type().Key())
+ }
+
+ m := make(map[string]*json.RawMessage)
+ if err := d.Decode(&m); err != nil {
+ return err
+ }
+ for k, v := range m {
+ result := conv.Call([]reflect.Value{reflect.ValueOf(k)})
+ if err := result[1].Interface(); err != nil {
+ return err.(error)
+ }
+ bk := result[0]
+ bv := reflect.New(rv.Type().Elem())
+ if err := unmarshalJSONPb([]byte(*v), bv.Interface()); err != nil {
+ return err
+ }
+ rv.SetMapIndex(bk, bv.Elem())
+ }
+ return nil
+ }
+ if _, ok := rv.Interface().(protoEnum); ok {
+ var repr interface{}
+ if err := d.Decode(&repr); err != nil {
+ return err
+ }
+ switch repr.(type) {
+ case string:
+ // TODO(yugui) Should use proto.StructProperties?
+ return fmt.Errorf("unmarshaling of symbolic enum %q not supported: %T", repr, rv.Interface())
+ case float64:
+ rv.Set(reflect.ValueOf(int32(repr.(float64))).Convert(rv.Type()))
+ return nil
+ default:
+ return fmt.Errorf("cannot assign %#v into Go type %T", repr, rv.Interface())
+ }
+ }
+ return d.Decode(v)
+}
+
+type protoEnum interface {
+ fmt.Stringer
+ EnumDescriptor() ([]byte, []int)
+}
+
+var typeProtoMessage = reflect.TypeOf((*proto.Message)(nil)).Elem()
+
+// Delimiter for newline encoded JSON streams.
+func (j *JSONPb) Delimiter() []byte {
+ return []byte("\n")
+}
+
+// allowUnknownFields helps not to return an error when the destination
+// is a struct and the input contains object keys which do not match any
+// non-ignored, exported fields in the destination.
+var allowUnknownFields = true
+
+// DisallowUnknownFields enables option in decoder (unmarshaller) to
+// return an error when it finds an unknown field. This function must be
+// called before using the JSON marshaller.
+func DisallowUnknownFields() {
+ allowUnknownFields = false
+}
+
+var (
+ convFromType = map[reflect.Kind]reflect.Value{
+ reflect.String: reflect.ValueOf(gw.String),
+ reflect.Bool: reflect.ValueOf(gw.Bool),
+ reflect.Float64: reflect.ValueOf(gw.Float64),
+ reflect.Float32: reflect.ValueOf(gw.Float32),
+ reflect.Int64: reflect.ValueOf(gw.Int64),
+ reflect.Int32: reflect.ValueOf(gw.Int32),
+ reflect.Uint64: reflect.ValueOf(gw.Uint64),
+ reflect.Uint32: reflect.ValueOf(gw.Uint32),
+ reflect.Slice: reflect.ValueOf(gw.Bytes),
+ }
+)
diff --git a/scripts/test.sh b/scripts/test.sh
index c8c6550ff..3bb89e800 100755
--- a/scripts/test.sh
+++ b/scripts/test.sh
@@ -171,12 +171,12 @@ function grpcproxy_pass {
function grpcproxy_integration_pass {
# shellcheck disable=SC2068
- run_for_module "tests" go_test "./integration/..." "fail_fast" : -timeout=30m -tags cluster_proxy ${COMMON_TEST_FLAGS[@]:-} "$@"
+ run_for_module "tests" go_test "./integration/..." "fail_fast" : -timeout=30m -tags cluster_proxy ${COMMON_TEST_FLAGS[@]:-} ${RUN_ARG[@]:-} "$@"
}
function grpcproxy_e2e_pass {
# shellcheck disable=SC2068
- run_for_module "tests" go_test "./e2e" "fail_fast" : -timeout=30m -tags cluster_proxy ${COMMON_TEST_FLAGS[@]:-} "$@"
+ run_for_module "tests" go_test "./e2e" "fail_fast" : -timeout=30m -tags cluster_proxy ${COMMON_TEST_FLAGS[@]:-} ${RUN_ARG[@]:-} "$@"
}
################# COVERAGE #####################################################
diff --git a/server/embed/serve.go b/server/embed/serve.go
index 85cd41989..48841a220 100644
--- a/server/embed/serve.go
+++ b/server/embed/serve.go
@@ -28,6 +28,7 @@ import (
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/pkg/v3/debugutil"
"go.etcd.io/etcd/pkg/v3/httputil"
+ "go.etcd.io/etcd/pkg/v3/legacygwjsonpb"
"go.etcd.io/etcd/server/v3/config"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/v3client"
@@ -301,7 +302,12 @@ func (sctx *serveCtx) registerGateway(dial func(ctx context.Context) (*grpc.Clie
if err != nil {
return nil, err
}
- gwmux := gw.NewServeMux()
+
+ gwmux := gw.NewServeMux(
+ gw.WithMarshalerOption(gw.MIMEWildcard, &gw.HTTPBodyMarshaler{
+ Marshaler: &legacygwjsonpb.JSONPb{OrigName: true},
+ }),
+ )
handlers := []registerHandlerFunc{
etcdservergw.RegisterKVHandler,
And based on my previous investigation in #16049, we need to change the error message in testing, since the error message proto has been changed. Hope it can save your time. diff --git a/tests/e2e/v3_curl_test.go b/tests/e2e/v3_curl_test.go
index b994b48973c..76f7040d176 100644
--- a/tests/e2e/v3_curl_test.go
+++ b/tests/e2e/v3_curl_test.go
@@ -181,7 +181,7 @@ func testV3CurlTxn(cx ctlCtx) {
// was crashing etcd server
malformed := `{"compare":[{"result":0,"target":1,"key":"Zm9v","TargetUnion":null}],"success":[{"Request":{"RequestPut":{"key":"Zm9v","value":"YmFy"}}}]}`
- if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/txn"), Value: malformed, Expected: "error"}); err != nil {
+ if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/txn"), Value: malformed, Expected: `"code":3,"message":"etcdserver: key not found"`}); err != nil {
cx.t.Fatalf("failed testV3CurlTxn put with curl using prefix (%s) (%v)", p, err)
}
@@ -232,7 +232,7 @@ func testV3CurlAuth(cx ctlCtx) {
testutil.AssertNil(cx.t, err)
// fail put no auth
- if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/put"), Value: string(putreq), Expected: "error"}); err != nil {
+ if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/put"), Value: string(putreq), Expected: `"code":3,"message":"etcdserver: user name is empty"`}); err != nil {
cx.t.Fatalf("failed testV3CurlAuth no auth put with curl using prefix (%s) (%v)", p, err)
}
@@ -347,7 +347,7 @@ func testV3CurlProclaimMissiongLeaderKey(cx ctlCtx) {
if err = e2e.CURLPost(cx.epc, e2e.CURLReq{
Endpoint: path.Join(cx.apiPrefix, "/election/proclaim"),
Value: string(pdata),
- Expected: `{"error":"\"leader\" field must be provided","code":2,"message":"\"leader\" field must be provided"}`,
+ Expected: `{"code":2,"message":"\"leader\" field must be provided"}`,
}); err != nil {
cx.t.Fatalf("failed post proclaim request (%s) (%v)", cx.apiPrefix, err)
}
@@ -363,7 +363,7 @@ func testV3CurlResignMissiongLeaderKey(cx ctlCtx) {
if err := e2e.CURLPost(cx.epc, e2e.CURLReq{
Endpoint: path.Join(cx.apiPrefix, "/election/resign"),
Value: `{}`,
- Expected: `{"error":"\"leader\" field must be provided","code":2,"message":"\"leader\" field must be provided"}`,
+ Expected: `{"code":2,"message":"\"leader\" field must be provided"}`,
}); err != nil {
cx.t.Fatalf("failed post resign request (%s) (%v)", cx.apiPrefix, err)
} |
The *.pb.go files are still generated using the legacy
This seems to be the correct solution. Can you attach a patch so that I can have a quick verification? thx |
@ahrtr https://gist.github.com/fuweid/cfdca3119fab4c66d229106c7cc63b2c This is the patch.
|
9f892b6
to
1b47ea3
Compare
@fuweid Would you mind to create a separate PR to fix the following change? It should can be approved & merged right away.
Let's try to get unrelated change included in separate PR. For others reference, the existing
|
The good news is all workflow checks are green now. Many thanks to @fuweid. We are still depending on the
I resolved above compatibility issue by hacking the generatead But there is a copyright "issue" which needs to be clarified. In order to ensure the server side (etcdserver) can correctly marshal/unmarshal both messageV1 (e.g. messages coming from streaming RESTful API) and messageV2 (e.g. unary API, which are always converted to messageV2 mentioned above), we maintain a forked version of marshal_jsonpb.go, which is actually a fork of https://github.com/grpc-ecosystem/grpc-gateway/blob/v1.16.0/runtime/marshal_jsonpb.go with minor modification. Please refer to 1b47ea3 for more detailed info. I am not sure whether it has any copyright issue. My understanding is it should be fine, reasons:
Please let me know if anyone has concern on this, including both license "issue" and the technical approach. @fuweid @mitake @ptabor @serathius @spzala @wenjiaswe @chaochn47 @jmhbnz @johanbrandhorst @liggitt @dims BTW, we also need to add more test cases to cover all RESTful APIs, which can be resolved in separate PR(s). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, I just don't want to miss adding the LICENSE file to be compliant with the grpc-gateway license.
bae1f8d
to
8cd934a
Compare
LGTM |
Is the json marshaling of gateway v2 incompatible with gateway v1? Is there not a way to have a shim json marshaling implementation that delegates to the gateway v2 json marshaling? |
since the output is explicitly supposed to be unstable w.r.t. non-significant whitespace, I think the curl test should normalize whitespace before comparing |
5d7d25d
to
fb57475
Compare
Signed-off-by: Benjamin Wang <wachao@vmware.com>
Good news, all workflow checks are green now based on @fuweid 's patch above and my temporary modification on the test cases. Notes:
Next steps:
|
A formal PR #16595 was just submitted. |
Completed in #16595 |
Please read https://github.com/etcd-io/etcd/blob/main/CONTRIBUTING.md#contribution-flow.