Skip to content

Commit

Permalink
optimize dumpling's effiency hotfix (pingcap#53)
Browse files Browse the repository at this point in the history
* initial commit

* fix

* fix2

* fix effiency bug

* make greater chan

* avoid malloc

* remove batch

* use new method

* try sync.Pool

* fix bug

* refine escape

* fix

* switch to bytes.buffer

* add pipe

* fix

* change usage

* test

* add variable to struct

* use global variable

* add UT

* update go.mod go.sum

* refine code

* refine code again

* add escape integration tests

* fix bug

* make pipe bigger

* fix

* fix ut

* fix primary

* fix bug

* fix

* fix

* use byte stream

* tmp

* async scan and next

* remove async scan

* remove bytes stream

* refine code

* refine log

* remove debug log

* remove DAO

* add wait group

* use sql.RawBytes

* use bytes

* Revert "use bytes"

This reverts commit fb79bf5bb000989e7347bba024fce7f2346234be.

* test

* refine code

* use rawbytes for SQLTypeBytes

* use byte stream

* refine code

* use bytes.buffer as buffer and write to os.file direcly

* reuse bytes.buffer

* fix bug

* check cap and then grow

* address comment
  • Loading branch information
lichunzhu authored Mar 19, 2020
1 parent 9988a18 commit 255ce0d
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions v4/export/writer_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var pool = sync.Pool{New: func() interface{} {
}}

type writerPipe struct {
input chan []byte
input chan *bytes.Buffer
closed chan struct{}
errCh chan error

Expand All @@ -30,7 +30,7 @@ type writerPipe struct {

func newWriterPipe(w io.Writer) *writerPipe {
return &writerPipe{
input: make(chan []byte, 8),
input: make(chan *bytes.Buffer, 8),
closed: make(chan struct{}),
errCh: make(chan error, 1),
w: w,
Expand All @@ -49,7 +49,9 @@ func (b *writerPipe) Run(ctx context.Context) {
if errOccurs {
continue
}
err := writeBytes(b.w, s)
err := writeBytes(b.w, s.Bytes())
s.Reset()
pool.Put(s)
if err != nil {
errOccurs = true
b.errCh <- err
Expand Down Expand Up @@ -94,7 +96,9 @@ func WriteInsert(tblIR TableDataIR, w io.Writer) error {
}

bf := pool.Get().(*bytes.Buffer)
bf.Grow(lengthLimit)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}

wp := newWriterPipe(w)

Expand Down Expand Up @@ -148,8 +152,11 @@ func WriteInsert(tblIR TableDataIR, w io.Writer) error {
counter += 1

if bf.Len() >= lengthLimit {
wp.input <- bf.Bytes()
bf.Reset()
wp.input <- bf
bf = pool.Get().(*bytes.Buffer)
if bfCap := bf.Cap(); bfCap < lengthLimit {
bf.Grow(lengthLimit - bfCap)
}
}

fileRowIter.Next()
Expand All @@ -168,12 +175,10 @@ func WriteInsert(tblIR TableDataIR, w io.Writer) error {
zap.String("table", tblIR.TableName()),
zap.Int("record counts", counter))
if bf.Len() > 0 {
wp.input <- bf.Bytes()
bf.Reset()
wp.input <- bf
}
close(wp.input)
<-wp.closed
pool.Put(bf)
return wp.Error()
}

Expand Down

0 comments on commit 255ce0d

Please sign in to comment.