Skip to content

Commit

Permalink
refactor(rest): convert to byte collector
Browse files Browse the repository at this point in the history
Support multiple formats like csv

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Oct 8, 2024
1 parent dc7a770 commit d4d3312
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 177 deletions.
48 changes: 25 additions & 23 deletions internal/io/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
type ClientConf struct {
config *RawConf
client *http.Client
compressor message.Compressor // compressor used to payload compression when specifies compressAlgorithm
decompressor message.Decompressor // decompressor used to payload decompression when specifies compressAlgorithm

// auth related
Expand Down Expand Up @@ -91,7 +90,7 @@ type bodyResp struct {
Code int `json:"code"`
}

var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": ""}
var bodyTypeMap = map[string]string{"none": "", "text": "text/plain", "json": "application/json", "html": "text/html", "xml": "application/xml", "javascript": "application/javascript", "form": "", "binary": "application/octet-stream"}

// newTransport allows EdgeX Foundry, protected by OpenZiti to override and obtain a transport
// protected by OpenZiti's zero trust connectivity. See client_edgex.go where this function is
Expand Down Expand Up @@ -198,11 +197,6 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro
cc.config = c
// that means payload need compression and decompression, so we need initialize compressor and decompressor
if c.Compression != "" {
cc.compressor, err = compressor.GetCompressor(c.Compression)
if err != nil {
return fmt.Errorf("init payload compressor failed, %w", err)
}

cc.decompressor, err = compressor.GetDecompressor(c.Compression)
if err != nil {
return fmt.Errorf("init payload decompressor failed, %w", err)
Expand All @@ -222,11 +216,11 @@ func (cc *ClientConf) InitConf(device string, props map[string]interface{}) erro

// initialize the oAuth access token
func (cc *ClientConf) auth(ctx api.StreamContext) error {
resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, true, cc.accessConf.Body)
resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.accessConf.Url, nil, cc.accessConf.Body)
if err != nil {
return err
}
tokens, _, err := cc.parseResponse(ctx, resp, "")
tokens, _, err := cc.parseResponse(ctx, resp, "", true, true)
if err != nil {
return err
}
Expand Down Expand Up @@ -274,11 +268,11 @@ func (cc *ClientConf) refresh(ctx api.StreamContext) error {
}
}

resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.refreshConf.Url, headers, true, cc.refreshConf.Body)
resp, err := httpx.Send(conf.Log, cc.client, "json", http.MethodPost, cc.refreshConf.Url, headers, cc.refreshConf.Body)
if err != nil {
return fmt.Errorf("fail to get refresh token: %v", err)
}
nt, _, err := cc.parseResponse(ctx, resp, "")
nt, _, err := cc.parseResponse(ctx, resp, "", true, true)
if err != nil {
return fmt.Errorf("Cannot parse refresh token response to json: %v", err)
}
Expand Down Expand Up @@ -317,9 +311,11 @@ func (cc *ClientConf) responseBodyDecompress(ctx api.StreamContext, resp *http.R
}

// parse the response status. For rest sink, it will not return the body by default if not need to debug
func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, lastMD5 string) ([]map[string]interface{}, string, error) {
func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response, lastMD5 string, returnBody bool, skipDecompression bool) ([]map[string]interface{}, string, error) {
if resp.StatusCode < 200 || resp.StatusCode > 299 {
return nil, "", fmt.Errorf("%s: %d", CODE_ERR, resp.StatusCode)
} else if !returnBody { // For rest sink who only need to know if the request is successful
return nil, "", nil
}

c, err := io.ReadAll(resp.Body)
Expand All @@ -332,7 +328,7 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response,
}()

newMD5 := ""
if cc.config.Incremental {
if returnBody && cc.config.Incremental {
newMD5 = getMD5Hash(c)
if newMD5 == lastMD5 {
return nil, newMD5, nil
Expand All @@ -341,18 +337,21 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response,

switch cc.config.ResponseType {
case "code":
if cc.config.Compression != "" {
if c, err = cc.responseBodyDecompress(ctx, resp, c); err != nil {
return nil, "", fmt.Errorf("try to decompress payload failed, %w", err)
if returnBody {
if cc.config.Compression != "" && !skipDecompression {
if c, err = cc.responseBodyDecompress(ctx, resp, c); err != nil {
return nil, "", fmt.Errorf("try to decompress payload failed, %w", err)
}

Check warning on line 344 in internal/io/http/client.go

View check run for this annotation

Codecov / codecov/patch

internal/io/http/client.go#L342-L344

Added lines #L342 - L344 were not covered by tests
}
m, e := decode(c)
if e != nil {
return nil, "", fmt.Errorf("%s: decode fail for %v", BODY_ERR, e)
}

Check warning on line 349 in internal/io/http/client.go

View check run for this annotation

Codecov / codecov/patch

internal/io/http/client.go#L348-L349

Added lines #L348 - L349 were not covered by tests
return m, newMD5, e
}
m, e := decode(c)
if e != nil {
return nil, "", fmt.Errorf("%s: decode fail for %v", BODY_ERR, e)
}
return m, newMD5, e
return nil, "", nil

Check warning on line 352 in internal/io/http/client.go

View check run for this annotation

Codecov / codecov/patch

internal/io/http/client.go#L352

Added line #L352 was not covered by tests
case "body":
if cc.config.Compression != "" {
if cc.config.Compression != "" && !skipDecompression {
if c, err = cc.responseBodyDecompress(ctx, resp, c); err != nil {
return nil, "", fmt.Errorf("try to decompress payload failed, %w", err)
}
Expand All @@ -371,7 +370,10 @@ func (cc *ClientConf) parseResponse(ctx api.StreamContext, resp *http.Response,
return nil, "", fmt.Errorf("%s: %d", CODE_ERR, ro.Code)
}
}
return payloads, newMD5, nil
if returnBody {
return payloads, newMD5, nil
}
return nil, "", nil

Check warning on line 376 in internal/io/http/client.go

View check run for this annotation

Codecov / codecov/patch

internal/io/http/client.go#L376

Added line #L376 was not covered by tests
default:
return nil, "", fmt.Errorf("%s: unsupported response type %s", BODY_ERR, cc.config.ResponseType)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/io/http/httppull_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func doPull(ctx api.StreamContext, c *ClientConf, lastMD5 string) ([]map[string]
if err != nil {
return nil, "", err
}
resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, c.config.Url, headers, true, []byte(c.config.Body))
resp, err := httpx.Send(ctx.GetLogger(), c.client, c.config.BodyType, c.config.Method, c.config.Url, headers, []byte(c.config.Body))
if err != nil {
return nil, "", err
}
results, newMD5, err := c.parseResponse(ctx, resp, lastMD5)
results, newMD5, err := c.parseResponse(ctx, resp, lastMD5, true, false)
if err != nil {
return nil, "", err
}
Expand Down
35 changes: 21 additions & 14 deletions internal/io/http/rest_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,13 @@ func (r *RestSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) e
return nil
}

func (r *RestSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
return r.collect(ctx, item, item.ToMap())
}

func (r *RestSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error {
return r.collect(ctx, items, items.ToMaps())
}

func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error {
func (r *RestSink) Collect(ctx api.StreamContext, item api.RawTuple) error {
logger := ctx.GetLogger()
headers := r.config.Headers
bodyType := r.config.BodyType
method := r.config.Method
u := r.config.Url

if dp, ok := item.(api.HasDynamicProps); ok {
for k := range headers {
nv, ok := dp.DynamicProps(k)
Expand All @@ -78,7 +71,21 @@ func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error {
u = nu
}
}
resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, r.config.SendSingle, data)

switch r.config.Compression {
case "zstd":
if headers == nil {
headers = make(map[string]string)
}
headers["Content-Encoding"] = "zstd"
case "gzip":
if headers == nil {
headers = make(map[string]string)
}
headers["Content-Encoding"] = "gzip"
}

resp, err := httpx.Send(ctx.GetLogger(), r.client, bodyType, method, u, headers, item.Raw())
failpoint.Inject("recoverAbleErr", func() {
err = errors.New("connection reset by peer")
})
Expand All @@ -91,15 +98,15 @@ func (r *RestSink) collect(ctx api.StreamContext, item any, data any) error {
originErr.Error(),
recoverAble,
method,
u, data))
u, string(item.Raw())))
}
return fmt.Errorf(`rest sink fails to send out the data:err=%s recoverAble=%v method=%s path="%s" request_body="%s"`,
originErr.Error(),
recoverAble,
method, u, data)
method, u, string(item.Raw()))

Check warning on line 106 in internal/io/http/rest_sink.go

View check run for this annotation

Codecov / codecov/patch

internal/io/http/rest_sink.go#L106

Added line #L106 was not covered by tests
} else {
logger.Debugf("rest sink got response %v", resp)
_, b, err := r.parseResponse(ctx, resp, "")
_, b, err := r.parseResponse(ctx, resp, "", r.config.DebugResp, false)
// do not record response body error as it is not an error in the sink action.
if err != nil && !strings.HasPrefix(err.Error(), BODY_ERR) {
if strings.HasPrefix(err.Error(), BODY_ERR) {
Expand All @@ -125,4 +132,4 @@ func GetSink() api.Sink {
return &RestSink{}
}

var _ api.TupleCollector = &RestSink{}
var _ api.BytesCollector = &RestSink{}
Loading

0 comments on commit d4d3312

Please sign in to comment.