From d4d331226116d53cfa1c0afc53229adc8f63d57e Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Tue, 8 Oct 2024 16:29:17 +0800 Subject: [PATCH] refactor(rest): convert to byte collector Support multiple formats like csv Signed-off-by: Jiyong Huang --- internal/io/http/client.go | 48 ++++----- internal/io/http/httppull_source.go | 4 +- internal/io/http/rest_sink.go | 35 ++++--- internal/io/http/rest_sink_test.go | 145 +++++++++------------------- internal/pkg/httpx/http.go | 40 +------- internal/service/executors.go | 2 +- 6 files changed, 97 insertions(+), 177 deletions(-) diff --git a/internal/io/http/client.go b/internal/io/http/client.go index e02a4733fb..ed58d5d940 100644 --- a/internal/io/http/client.go +++ b/internal/io/http/client.go @@ -42,7 +42,6 @@ import ( type ClientConf struct { config *RawConf client *http.Client - compressor message.Compressor // compressor used to payload compression when specifies compressAlgorithm decompressor message.Decompressor // decompressor used to payload decompression when specifies compressAlgorithm // auth related @@ -91,7 +90,7 @@ type bodyResp struct { Code int `json:"code"` } -var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""} +var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "", "binary": "application/octet-stream"} // newTransport allows EdgeX Foundry, protected by OpenZiti to override and obtain a transport // protected by OpenZiti's zero trust connectivity. See client_edgex.go where this function is @@ -198,11 +197,6 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro cc.config = c // that means payload need compression and decompression, so we need initialize compressor and decompressor if c.Compression != "" { - cc.compressor, err = compressor.GetCompressor(c.Compression) - if err != nil { - return fmt.Errorf("init payload compressor failed, %w", err) - } - cc.decompressor, err = compressor.GetDecompressor(c.Compression) if err != nil { return fmt.Errorf("init payload decompressor failed, %w", err) @@ -222,11 +216,11 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro // initialize the oAuth access token func (cc *ClientConf) auth(ctx api.StreamContext) error { - resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, true, cc.accessConf.Body) + resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, cc.accessConf.Body) if err != nil { return err } - tokens, _, err := cc.parseResponse(ctx, resp, "") + tokens, _, err := cc.parseResponse(ctx, resp, "", true, true) if err != nil { return err } @@ -274,11 +268,11 @@ func (cc *ClientConf) refresh(ctx api.StreamContext) error { } } - resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.refreshConf.Url, headers, true, cc.refreshConf.Body) + resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.refreshConf.Url, headers, cc.refreshConf.Body) if err != nil { return fmt.Errorf("fail to get refresh token: %v", err) } - nt, _, err := cc.parseResponse(ctx, resp, "") + nt, _, err := cc.parseResponse(ctx, resp, "", true, true) if err != nil { return fmt.Errorf("Cannot parse refresh token response to json: %v", err) } @@ -317,9 +311,11 @@ func (cc *ClientConf) responseBodyDecompress(ctx api.StreamContext, resp *http.R } // parse the response status. For rest sink, it will not return the body by default if not need to debug -func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, lastMD5 string) ([]map[string]interface{}, string, error) { +func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, lastMD5 string, returnBody bool, skipDecompression bool) ([]map[string]interface{}, string, error) { if resp.StatusCode < 200 || resp.StatusCode > 299 { return nil, "", fmt.Errorf("%s: %d", CODE_ERR, resp.StatusCode) + } else if !returnBody { // For rest sink who only need to know if the request is successful + return nil, "", nil } c, err := io.ReadAll(resp.Body) @@ -332,7 +328,7 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, }() newMD5 := "" - if cc.config.Incremental { + if returnBody && cc.config.Incremental { newMD5 = getMD5Hash(c) if newMD5 == lastMD5 { return nil, newMD5, nil @@ -341,18 +337,21 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, switch cc.config.ResponseType { case "code": - if cc.config.Compression != "" { - if c, err = cc.responseBodyDecompress(ctx, resp, c); err != nil { - return nil, "", fmt.Errorf("try to decompress payload failed, %w", err) + if returnBody { + if cc.config.Compression != "" && !skipDecompression { + if c, err = cc.responseBodyDecompress(ctx, resp, c); err != nil { + return nil, "", fmt.Errorf("try to decompress payload failed, %w", err) + } } + m, e := decode(c) + if e != nil { + return nil, "", fmt.Errorf("%s: decode fail for %v", BODY_ERR, e) + } + return m, newMD5, e } - m, e := decode(c) - if e != nil { - return nil, "", fmt.Errorf("%s: decode fail for %v", BODY_ERR, e) - } - return m, newMD5, e + return nil, "", nil case "body": - if cc.config.Compression != "" { + if cc.config.Compression != "" && !skipDecompression { if c, err = cc.responseBodyDecompress(ctx, resp, c); err != nil { return nil, "", fmt.Errorf("try to decompress payload failed, %w", err) } @@ -371,7 +370,10 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, return nil, "", fmt.Errorf("%s: %d", CODE_ERR, ro.Code) } } - return payloads, newMD5, nil + if returnBody { + return payloads, newMD5, nil + } + return nil, "", nil default: return nil, "", fmt.Errorf("%s: unsupported response type %s", BODY_ERR, cc.config.ResponseType) } diff --git a/internal/io/http/httppull_source.go b/internal/io/http/httppull_source.go index 8e298950a1..cfcf19d2d4 100644 --- a/internal/io/http/httppull_source.go +++ b/internal/io/http/httppull_source.go @@ -75,11 +75,11 @@ func doPull(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string] if err != nil { return nil, "", err } - resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, c.config.Url, headers, true, []byte(c.config.Body)) + resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, c.config.Url, headers, []byte(c.config.Body)) if err != nil { return nil, "", err } - results, newMD5, err := c.parseResponse(ctx, resp, lastMD5) + results, newMD5, err := c.parseResponse(ctx, resp, lastMD5, true, false) if err != nil { return nil, "", err } diff --git a/internal/io/http/rest_sink.go b/internal/io/http/rest_sink.go index 580157ee96..f074e3bed5 100644 --- a/internal/io/http/rest_sink.go +++ b/internal/io/http/rest_sink.go @@ -44,20 +44,13 @@ func (r *RestSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) e return nil } -func (r *RestSink) Collect(ctx api.StreamContext, item api.MessageTuple) error { - return r.collect(ctx, item, item.ToMap()) -} - -func (r *RestSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error { - return r.collect(ctx, items, items.ToMaps()) -} - -func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error { +func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error { logger := ctx.GetLogger() headers := r.config.Headers bodyType := r.config.BodyType method := r.config.Method u := r.config.Url + if dp, ok := item.(api.HasDynamicProps); ok { for k := range headers { nv, ok := dp.DynamicProps(k) @@ -78,7 +71,21 @@ func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error { u = nu } } - resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, r.config.SendSingle, data) + + switch r.config.Compression { + case "zstd": + if headers == nil { + headers = make(map[string]string) + } + headers["Content-Encoding"] = "zstd" + case "gzip": + if headers == nil { + headers = make(map[string]string) + } + headers["Content-Encoding"] = "gzip" + } + + resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, item.Raw()) failpoint.Inject("recoverAbleErr", func() { err = errors.New("connection reset by peer") }) @@ -91,15 +98,15 @@ func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error { originErr.Error(), recoverAble, method, - u, data)) + u, string(item.Raw()))) } return fmt.Errorf(`rest sink fails to send out the data:err=%s recoverAble=%v method=%s path="%s" request_body="%s"`, originErr.Error(), recoverAble, - method, u, data) + method, u, string(item.Raw())) } else { logger.Debugf("rest sink got response %v", resp) - _, b, err := r.parseResponse(ctx, resp, "") + _, b, err := r.parseResponse(ctx, resp, "", r.config.DebugResp, false) // do not record response body error as it is not an error in the sink action. if err != nil && !strings.HasPrefix(err.Error(), BODY_ERR) { if strings.HasPrefix(err.Error(), BODY_ERR) { @@ -125,4 +132,4 @@ func GetSink() api.Sink { return &RestSink{} } -var _ api.TupleCollector = &RestSink{} +var _ api.BytesCollector = &RestSink{} diff --git a/internal/io/http/rest_sink_test.go b/internal/io/http/rest_sink_test.go index 21ad836867..9d927a46d0 100644 --- a/internal/io/http/rest_sink_test.go +++ b/internal/io/http/rest_sink_test.go @@ -15,6 +15,7 @@ package http import ( + "encoding/json" "fmt" "io" "net/http" @@ -31,9 +32,10 @@ import ( ) type request struct { - Method string - Body string - ContentType string + Method string + Body string + ContentType string + ContentEncoding string } func TestRestSink_Apply(t *testing.T) { @@ -48,7 +50,8 @@ func TestRestSink_Apply(t *testing.T) { config: map[string]interface{}{ "method": "post", //"url": "http://localhost/test", //set dynamically to the test server - "sendSingle": true, + "sendSingle": true, + "compression": "gzip", }, data: []map[string]interface{}{{ "ab": "hello1", @@ -56,54 +59,23 @@ func TestRestSink_Apply(t *testing.T) { "ab": "hello2", }}, result: []request{{ - Method: "POST", - Body: `{"ab":"hello1"}`, - ContentType: "application/json", + Method: "POST", + Body: `{"ab":"hello1"}`, + ContentType: "application/json", + ContentEncoding: "gzip", }, { - Method: "POST", - Body: `{"ab":"hello2"}`, - ContentType: "application/json", + Method: "POST", + Body: `{"ab":"hello2"}`, + ContentType: "application/json", + ContentEncoding: "gzip", }}, }, { name: "2", config: map[string]interface{}{ "method": "post", //"url": "http://localhost/test", //set dynamically to the test server - "headers": map[string]any{ - "Content-Type": "application/vnd.microsoft.servicebus.json", - }, - }, - data: []map[string]interface{}{{ - "ab": "hello1", - }, { - "ab": "hello2", - }}, - result: []request{{ - Method: "POST", - Body: `[{"ab":"hello1"},{"ab":"hello2"}]`, - ContentType: "application/vnd.microsoft.servicebus.json", - }}, - }, { - name: "3", - config: map[string]interface{}{ - "method": "get", - //"url": "http://localhost/test", //set dynamically to the test server - }, - data: []map[string]interface{}{{ - "ab": "hello1", - }, { - "ab": "hello2", - }}, - result: []request{{ - Method: "GET", - ContentType: "", - }}, - }, { - name: "4", - config: map[string]interface{}{ - "method": "put", - //"url": "http://localhost/test", //set dynamically to the test server - "bodyType": "text", + "sendSingle": true, + "compression": "zstd", }, data: []map[string]interface{}{{ "ab": "hello1", @@ -111,26 +83,15 @@ func TestRestSink_Apply(t *testing.T) { "ab": "hello2", }}, result: []request{{ - Method: "PUT", - ContentType: "text/plain", - Body: `[{"ab":"hello1"},{"ab":"hello2"}]`, - }}, - }, { - name: "5", - config: map[string]interface{}{ - "method": "post", - //"url": "http://localhost/test", //set dynamically to the test server - "bodyType": "form", - }, - data: []map[string]interface{}{{ - "ab": "hello1", + Method: "POST", + Body: `{"ab":"hello1"}`, + ContentType: "application/json", + ContentEncoding: "zstd", }, { - "ab": "hello2", - }}, - result: []request{{ - Method: "POST", - ContentType: "application/x-www-form-urlencoded;param=value", - Body: `result=%5B%7B%22ab%22%3A%22hello1%22%7D%2C%7B%22ab%22%3A%22hello2%22%7D%5D`, + Method: "POST", + Body: `{"ab":"hello2"}`, + ContentType: "application/json", + ContentEncoding: "zstd", }}, }, { name: "6", @@ -148,11 +109,11 @@ func TestRestSink_Apply(t *testing.T) { result: []request{{ Method: "POST", ContentType: "application/x-www-form-urlencoded;param=value", - Body: `ab=hello1`, + Body: "{\"ab\":\"hello1\"}", }, { Method: "POST", ContentType: "application/x-www-form-urlencoded;param=value", - Body: `ab=hello2`, + Body: "{\"ab\":\"hello2\"}", }}, }, { name: "7", @@ -191,9 +152,10 @@ func TestRestSink_Apply(t *testing.T) { } requests = append(requests, request{ - Method: r.Method, - Body: string(body), - ContentType: r.Header.Get("Content-Type"), + Method: r.Method, + Body: string(body), + ContentType: r.Header.Get("Content-Type"), + ContentEncoding: r.Header.Get("Content-Encoding"), }) ctx.GetLogger().Debugf(string(body)) fmt.Fprint(w, string(body)) @@ -202,10 +164,6 @@ func TestRestSink_Apply(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { requests = nil - ss, ok := tt.config["sendSingle"] - if !ok { - ss = false - } s := &RestSink{} tt.config["url"] = ts.URL e := s.Provision(ctx, tt.config) @@ -214,23 +172,12 @@ func TestRestSink_Apply(t *testing.T) { // do nothing }) assert.NoError(t, e) - if ss.(bool) { - for _, d := range tt.data { - e = s.Collect(ctx, &xsql.Tuple{ - Message: d, - }) - assert.NoError(t, e) - } - } else { - b := &xsql.WindowTuples{ - Content: make([]xsql.Row, 0, len(tt.data)), - } - for _, d := range tt.data { - b.Content = append(b.Content, &xsql.Tuple{ - Message: d, - }) - } - e = s.CollectList(ctx, b) + for _, d := range tt.data { + bb, err := json.Marshal(d) + require.NoError(t, err) + e = s.Collect(ctx, &xsql.RawTuple{ + Rawdata: bb, + }) assert.NoError(t, e) } @@ -253,15 +200,13 @@ func TestRestSinkCollect(t *testing.T) { "method": "get", "debugResp": true, })) - data := &xsql.Tuple{ - Message: map[string]interface{}{ - "a": 1, - }, + data := &xsql.RawTuple{ + Rawdata: []byte(`{"a":1}`), } require.NoError(t, s.Connect(ctx, func(status string, message string) { // do nothing })) - require.NoError(t, s.collect(ctx, data, data.ToMap())) + require.NoError(t, s.Collect(ctx, data)) require.NoError(t, s.Close(ctx)) } @@ -271,10 +216,8 @@ func TestRestSinkRecoverErr(t *testing.T) { server.Close() }() ctx := mockContext.NewMockContext("1", "2") - data := &xsql.Tuple{ - Message: map[string]interface{}{ - "a": 1, - }, + data := &xsql.RawTuple{ + Rawdata: []byte(`{"a":1}`), } sErr := &RestSink{} require.NoError(t, sErr.Provision(ctx, map[string]any{ @@ -284,7 +227,7 @@ func TestRestSinkRecoverErr(t *testing.T) { require.NoError(t, sErr.Connect(ctx, func(status string, message string) { // do nothing })) - err := sErr.collect(ctx, data, data.ToMap()) + err := sErr.Collect(ctx, data) require.Error(t, err) require.False(t, errorx.IsIOError(err)) s := &RestSink{} @@ -297,7 +240,7 @@ func TestRestSinkRecoverErr(t *testing.T) { require.NoError(t, s.Connect(ctx, func(status string, message string) { // do nothing })) - err = s.collect(ctx, data, data.ToMap()) + err = s.Collect(ctx, data) require.Error(t, err) require.True(t, errorx.IsIOError(err)) } diff --git a/internal/pkg/httpx/http.go b/internal/pkg/httpx/http.go index f2bbd16d34..861e5efc38 100644 --- a/internal/pkg/httpx/http.go +++ b/internal/pkg/httpx/http.go @@ -33,10 +33,10 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/conf" ) -var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""} +var BodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "application/x-www-form-urlencoded;param=value"} // Send v must be a []byte or map -func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, sendSingle bool, v interface{}) (*http.Response, error) { +func Send(logger api.Logger, client *http.Client, bodyType string, method string, u string, headers map[string]string, v any) (*http.Response, error) { var req *http.Request var err error switch bodyType { @@ -45,7 +45,7 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string if err != nil { return nil, fmt.Errorf("fail to create request: %v", err) } - case "json", "text", "javascript", "html", "xml": + case "json", "text", "javascript", "html", "xml", "form": var body io.Reader switch t := v.(type) { case []byte: @@ -53,11 +53,7 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string case string: body = strings.NewReader(t) default: - vj, err := json.Marshal(v) - if err != nil { - return nil, fmt.Errorf("invalid content: %v", v) - } - body = bytes.NewBuffer(vj) + return nil, fmt.Errorf("http send only supports bytes but receive invalid content: %v", v) } req, err = http.NewRequest(method, u, body) if err != nil { @@ -66,34 +62,6 @@ func Send(logger api.Logger, client *http.Client, bodyType string, method string if req.Header.Get("Content-Type") == "" { req.Header.Set("Content-Type", BodyTypeMap[bodyType]) } - case "form": - form := url.Values{} - im, err := convertToMap(v, sendSingle) - if err != nil { - return nil, err - } - for key, value := range im { - var vstr string - switch value.(type) { - case []interface{}, map[string]interface{}: - if temp, err := json.Marshal(value); err != nil { - return nil, fmt.Errorf("fail to parse from value: %v", err) - } else { - vstr = string(temp) - } - default: - vstr = fmt.Sprintf("%v", value) - } - form.Set(key, vstr) - } - body := io.NopCloser(strings.NewReader(form.Encode())) - req, err = http.NewRequest(method, u, body) - if err != nil { - return nil, fmt.Errorf("fail to create request: %v", err) - } - if req.Header.Get("Content-Type") == "" { - req.Header.Set("Content-Type", "application/x-www-form-urlencoded;param=value") - } default: return nil, fmt.Errorf("unsupported body type %s", bodyType) } diff --git a/internal/service/executors.go b/internal/service/executors.go index 86aeffa1f0..8ebebcf62e 100644 --- a/internal/service/executors.go +++ b/internal/service/executors.go @@ -258,7 +258,7 @@ func (h *httpExecutor) invokeFunction(ctx api.FunctionContext, name string, para if err != nil { return nil, err } - resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, false, hm.Body) + resp, err := httpx.Send(ctx.GetLogger(), h.conn, "json", hm.Method, u, h.restOpt.Headers, hm.Body) if err != nil { return nil, err }