Skip to content

Commit

Permalink
chore(op): csv header encode before batch
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Dec 24, 2024
1 parent 53fc736 commit 825cdff
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 130 deletions.
113 changes: 0 additions & 113 deletions fvt/csv_test.go

This file was deleted.

17 changes: 9 additions & 8 deletions internal/converter/delimited/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package delimited

import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"strconv"
Expand All @@ -30,9 +29,10 @@ import (
)

type Converter struct {
Delimiter string `json:"delimiter"`
Cols []string `json:"fields"`
HasHeader bool `json:"hasHeader"`
Delimiter string `json:"delimiter"`
Cols []string `json:"fields"`
HasHeader bool `json:"hasHeader"`
sendHeader bool
}

func NewConverter(props map[string]any) (message.Converter, error) {
Expand Down Expand Up @@ -64,12 +64,12 @@ func (c *Converter) Encode(ctx api.StreamContext, d any) (b []byte, err error) {
}
sort.Strings(keys)
c.Cols = keys
if len(c.Cols) > 0 && c.HasHeader {
if len(c.Cols) > 0 && c.HasHeader && !c.sendHeader {
c.sendHeader = true
hb := []byte(strings.Join(c.Cols, c.Delimiter))
sb.WriteString(c.Delimiter)
_ = binary.Write(sb, binary.BigEndian, uint32(len(hb)))
sb.Write(hb)
ctx.GetLogger().Infof("delimiter header %s", hb)
sb.WriteString("\n")
ctx.GetLogger().Debugf("header %s", hb)
}
}
for i, v := range c.Cols {
Expand All @@ -79,6 +79,7 @@ func (c *Converter) Encode(ctx api.StreamContext, d any) (b []byte, err error) {
p, _ := cast.ToString(m[v], cast.CONVERT_ALL)
sb.WriteString(p)
}
sb.Write([]byte("\n"))
return sb.Bytes(), nil
case []map[string]any:
sb := &bytes.Buffer{}
Expand Down
8 changes: 4 additions & 4 deletions internal/converter/delimited/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestEncode(t *testing.T) {
"id": 1670170500.0,
"name": "test",
},
r: []byte(`1670170500:test`),
r: []byte("1670170500:test\n"),
},
{
name: "embedded",
Expand All @@ -57,7 +57,7 @@ func TestEncode(t *testing.T) {
},
},
},
r: []byte(`22:map[indoor:[Chess] outdoor:[Basketball]]:7:John Doe`),
r: []byte("22:map[indoor:[Chess] outdoor:[Basketball]]:7:John Doe\n"),
},
{
name: "list",
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestEncodeWithHeader(t *testing.T) {
"id": 12,
"name": "test",
},
r: []byte{0x3a, 0x0, 0x0, 0x0, 0x7, 0x69, 0x64, 0x3a, 0x6e, 0x61, 0x6d, 0x65, 0x31, 0x32, 0x3a, 0x74, 0x65, 0x73, 0x74},
r: []byte{0x69, 0x64, 0x3a, 0x6e, 0x61, 0x6d, 0x65, 0xa, 0x31, 0x32, 0x3a, 0x74, 0x65, 0x73, 0x74, 0xa},
},
{
name: "embedded",
Expand All @@ -120,7 +120,7 @@ func TestEncodeWithHeader(t *testing.T) {
},
},
},
r: []byte{0x3a, 0x0, 0x0, 0x0, 0x13, 0x61, 0x67, 0x65, 0x3a, 0x68, 0x6f, 0x62, 0x62, 0x69, 0x65, 0x73, 0x3a, 0x69, 0x64, 0x3a, 0x6e, 0x61, 0x6d, 0x65, 0x32, 0x32, 0x3a, 0x6d, 0x61, 0x70, 0x5b, 0x69, 0x6e, 0x64, 0x6f, 0x6f, 0x72, 0x3a, 0x5b, 0x43, 0x68, 0x65, 0x73, 0x73, 0x5d, 0x20, 0x6f, 0x75, 0x74, 0x64, 0x6f, 0x6f, 0x72, 0x3a, 0x5b, 0x42, 0x61, 0x73, 0x6b, 0x65, 0x74, 0x62, 0x61, 0x6c, 0x6c, 0x5d, 0x5d, 0x3a, 0x37, 0x3a, 0x4a, 0x6f, 0x68, 0x6e, 0x20, 0x44, 0x6f, 0x65},
r: []byte{0x61, 0x67, 0x65, 0x3a, 0x68, 0x6f, 0x62, 0x62, 0x69, 0x65, 0x73, 0x3a, 0x69, 0x64, 0x3a, 0x6e, 0x61, 0x6d, 0x65, 0xa, 0x32, 0x32, 0x3a, 0x6d, 0x61, 0x70, 0x5b, 0x69, 0x6e, 0x64, 0x6f, 0x6f, 0x72, 0x3a, 0x5b, 0x43, 0x68, 0x65, 0x73, 0x73, 0x5d, 0x20, 0x6f, 0x75, 0x74, 0x64, 0x6f, 0x6f, 0x72, 0x3a, 0x5b, 0x42, 0x61, 0x73, 0x6b, 0x65, 0x74, 0x62, 0x61, 0x6c, 0x6c, 0x5d, 0x5d, 0x3a, 0x37, 0x3a, 0x4a, 0x6f, 0x68, 0x6e, 0x20, 0x44, 0x6f, 0x65, 0xa},
},
{
name: "list",
Expand Down
17 changes: 16 additions & 1 deletion internal/topo/node/batch_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ type BatchOp struct {
// configs
batchSize int
lingerInterval time.Duration
hasHeader bool
// state
buffer *xsql.WindowTuples
rawBuffer bytes.Buffer
rawTuple *xsql.RawTuple
rawHeader []byte

nextLink trace.Link
nextSpanCtx context.Context
Expand All @@ -49,13 +51,14 @@ type BatchOp struct {
currIndex int
}

func NewBatchOp(name string, rOpt *def.RuleOption, batchSize int, lingerInterval time.Duration) (*BatchOp, error) {
func NewBatchOp(name string, rOpt *def.RuleOption, batchSize int, lingerInterval time.Duration, hasHeader bool) (*BatchOp, error) {
if batchSize < 1 && lingerInterval < 1 {
return nil, fmt.Errorf("either batchSize or lingerInterval should be larger than 0")
}
o := &BatchOp{
defaultSinkNode: newDefaultSinkNode(name, rOpt),
batchSize: batchSize,
hasHeader: hasHeader,
lingerInterval: lingerInterval,
currIndex: 0,
rowHandle: make(map[xsql.Row]trace.Span),
Expand Down Expand Up @@ -124,6 +127,15 @@ func (b *BatchOp) ingest(ctx api.StreamContext, item any, checkSize bool) {
// b.handleTraceIngest(ctx, input)
b.rawTuple = input
b.rawBuffer.Write(input.Raw())
if b.hasHeader && b.rawHeader == nil {
newlineIndex := bytes.IndexByte(input.Raw(), '\n')
if newlineIndex != -1 {
b.rawHeader = input.Raw()[:newlineIndex]
ctx.GetLogger().Infof("Get new header")
} else {
ctx.GetLogger().Infof("No header found")
}
}
case xsql.Row:
b.handleTraceIngest(ctx, input)
b.buffer.AddTuple(input)
Expand Down Expand Up @@ -168,6 +180,9 @@ func (b *BatchOp) send(ctx api.StreamContext) {
b.rawTuple = nil
b.rawBuffer.Reset()
b.currIndex = 0
if b.hasHeader {
b.rawBuffer.Write(b.rawHeader)
}
} else {
return
}
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/node/batch_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestRun(t *testing.T) {
mc := mockclock.GetMockClock()
for i, tc := range testcases {
t.Run(fmt.Sprintf("testcase %d", i), func(t *testing.T) {
op, err := NewBatchOp("test", &def.RuleOption{BufferLength: 10, SendError: true}, tc.batchSize, tc.lingerInterval)
op, err := NewBatchOp("test", &def.RuleOption{BufferLength: 10, SendError: true}, tc.batchSize, tc.lingerInterval, false)
if len(tc.err) > 0 {
assert.Error(t, err)
assert.Equal(t, tc.err, err.Error())
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestRun(t *testing.T) {
}

func TestBatchOpSendEmpty(t *testing.T) {
op, err := NewBatchOp("test", &def.RuleOption{BufferLength: 10, SendError: true}, 0, time.Second)
op, err := NewBatchOp("test", &def.RuleOption{BufferLength: 10, SendError: true}, 0, time.Second, false)
require.NoError(t, err)
failpoint.Enable("github.com/lf-edge/ekuiper/v2/internal/topo/node/injectPanic", "return(true)")
op.send(mockContext.NewMockContext("1", "2"))
Expand Down
4 changes: 2 additions & 2 deletions internal/topo/planner/planner_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOpti
result = append(result, encodeOp)
// Batch enabled
if sc.BatchSize > 0 || sc.LingerInterval > 0 {
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval))
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval), sc.HasHeader)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func splitSink(tp *topo.Topo, s api.Sink, sinkName string, options *def.RuleOpti
} else {
// Batch enabled
if sc.BatchSize > 0 || sc.LingerInterval > 0 {
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval))
batchOp, err := node.NewBatchOp(fmt.Sprintf("%s_%d_batch", sinkName, index), options, sc.BatchSize, time.Duration(sc.LingerInterval), sc.HasHeader)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 825cdff

Please sign in to comment.