Skip to content

Commit

Permalink
Code refactor and tuning (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu authored Mar 2, 2022
1 parent c6c507a commit 81252c0
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 148 deletions.
28 changes: 6 additions & 22 deletions entry/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,26 @@ type Writer interface {
WriteBatch(Batch) (common.ErrCode, error)
}

// WriteFlusher interface.
type WriteFlusher interface {
io.Writer
Flush() error
}

// Entry represents queue entry.
type Entry []byte

// Marshal writes entry to writer.
func (e Entry) Marshal(w WriteFlusher, format common.EntryFormat, flushable bool) (code common.ErrCode, err error) {
func (e Entry) Marshal(w io.Writer, format common.EntryFormat) (code common.ErrCode, err error) {
switch format {
case common.EntryV1:
return e.marshalV1(w, flushable)
return e.marshalV1(w)

default:
return common.EntryUnsupportedFormat, common.ErrEntryUnsupportedFormat
}
}

// [Length - uint32][Checksum - uint32][Payload - bytes]
func (e Entry) marshalV1(w WriteFlusher, flushable bool) (code common.ErrCode, err error) {
func (e Entry) marshalV1(w io.Writer) (code common.ErrCode, err error) {
var buf [8]byte
common.Endianese.PutUint64(buf[:], uint64(len(e))<<32|uint64(crc32.ChecksumIEEE(e)))
if _, err = w.Write(buf[:]); err == nil {
if _, err = w.Write(e); err == nil {
if flushable {
err = w.Flush()
}
}
_, err = w.Write(e)
}

if err != nil {
Expand Down Expand Up @@ -176,19 +166,13 @@ func (b *Batch) Append(e Entry) {
}

// Marshal into writer.
func (b *Batch) Marshal(w WriteFlusher, format common.EntryFormat) (code common.ErrCode, err error) {
func (b *Batch) Marshal(w io.Writer, format common.EntryFormat) (code common.ErrCode, err error) {
if b.Len() > 0 {
for _, e := range b.entries {
if code, err = e.Marshal(w, format, false); err != nil {
if code, err = e.Marshal(w, format); err != nil {
return code, err
}
}

if err = w.Flush(); err != nil {
code = common.EntryWriteErr
} else {
code = common.NoError
}
}
return
}
Expand Down
18 changes: 4 additions & 14 deletions entry/entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ func (e *errorWriter) Write([]byte) (int, error) { return 0, fmt.Errorf("fake er

func (e *errorWriter) Flush() error { return nil }

type errorFlusher struct{}

func (e *errorFlusher) Write([]byte) (int, error) { return 0, nil }

func (e *errorFlusher) Flush() error { return fmt.Errorf("fake error") }

type noopFlusher struct{ io.Writer }

func (f *noopFlusher) Flush() error { return nil }
Expand All @@ -139,25 +133,21 @@ func TestEntryMarshal(t *testing.T) {
t.Run("UnsupportedFormat", func(t *testing.T) {
var e Entry = []byte{1, 2, 3, 4}

code, err := e.Marshal(nil, 123, true)
code, err := e.Marshal(nil, 123)
require.Equal(t, common.ErrEntryUnsupportedFormat, err)
require.Equal(t, common.EntryUnsupportedFormat, code)
})

t.Run("ErrorHandling", func(t *testing.T) {
var e Entry = []byte{1, 2, 3, 4}

code, err := e.Marshal(&errorWriter{}, common.EntryV1, true)
code, err := e.Marshal(&errorWriter{}, common.EntryV1)
require.Equal(t, common.EntryWriteErr, code)
require.Error(t, err)

batch := NewBatch(2)
batch.Append(e)

code, err = batch.Marshal(&errorFlusher{}, common.EntryV1)
require.Equal(t, common.EntryWriteErr, code)
require.Error(t, err)

code, err = batch.Marshal(&errorWriter{}, common.EntryV1)
require.Equal(t, common.EntryWriteErr, code)
require.Error(t, err)
Expand All @@ -167,7 +157,7 @@ func TestEntryMarshal(t *testing.T) {
var e Entry = []byte{1, 2, 3, 4}
var buf bytes.Buffer

code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1, true)
code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1)
require.NoError(t, err)
require.Equal(t, code, common.NoError)
require.EqualValues(t, []byte{0, 0, 0, 4, 0xb6, 0x3c, 0xfb, 0xcd, 1, 2, 3, 4}, buf.Bytes())
Expand Down Expand Up @@ -201,7 +191,7 @@ func TestEntry(t *testing.T) {

var buf bytes.Buffer

code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1, true)
code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1)
require.NoError(t, err)
require.Equal(t, common.NoError, code)

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ require (
github.com/grandecola/mmap v0.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/kr/pty v1.1.1 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
3 changes: 2 additions & 1 deletion queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestQueueRace(t *testing.T) {
_ = q.Close()
}()

// start reader
// start readers
var wg sync.WaitGroup

collectValue := make([]int, size)
Expand Down Expand Up @@ -195,6 +195,7 @@ func TestQueueRace(t *testing.T) {
}()
}

// start writers
ch := make(chan uint32, 1)
for i := 0; i < 4; i++ {
wg.Add(1)
Expand Down
17 changes: 2 additions & 15 deletions segment/v1/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package segv1

import (
"io"
"sync"
"sync/atomic"

"github.com/linxGnu/pqueue/common"
Expand All @@ -16,15 +15,12 @@ type Segment struct {
readOnly bool

entryFormat common.EntryFormat
w entry.Writer

r entry.Reader
rLock sync.Mutex
offset uint32
numEntries uint32
maxEntries uint32

w entry.Writer
wLock sync.Mutex
r entry.Reader
}

// NewReadOnlySegment creates new Segment for readonly.
Expand Down Expand Up @@ -122,9 +118,6 @@ func (s *Segment) WriteEntry(e entry.Entry) (common.ErrCode, error) {
}

func (s *Segment) writeEntry(e entry.Entry) (common.ErrCode, error) {
s.wLock.Lock()
defer s.wLock.Unlock()

if s.numEntries >= s.maxEntries {
return common.SegmentNoMoreWrite, nil
}
Expand All @@ -148,9 +141,6 @@ func (s *Segment) WriteBatch(b entry.Batch) (common.ErrCode, error) {
}

func (s *Segment) writeBatch(b entry.Batch) (common.ErrCode, error) {
s.wLock.Lock()
defer s.wLock.Unlock()

if s.numEntries >= s.maxEntries {
return common.SegmentNoMoreWrite, nil
}
Expand All @@ -166,9 +156,6 @@ func (s *Segment) writeBatch(b entry.Batch) (common.ErrCode, error) {

// ReadEntry from segment.
func (s *Segment) ReadEntry(e *entry.Entry) (common.ErrCode, int, error) {
s.rLock.Lock()
defer s.rLock.Unlock()

if !s.readOnly {
// readable?
if s.offset == atomic.LoadUint32(&s.numEntries) {
Expand Down
Loading

0 comments on commit 81252c0

Please sign in to comment.