diff --git a/entry/entry.go b/entry/entry.go index b083858..0bb7a8b 100644 --- a/entry/entry.go +++ b/entry/entry.go @@ -22,20 +22,14 @@ type Writer interface { WriteBatch(Batch) (common.ErrCode, error) } -// WriteFlusher interface. -type WriteFlusher interface { - io.Writer - Flush() error -} - // Entry represents queue entry. type Entry []byte // Marshal writes entry to writer. -func (e Entry) Marshal(w WriteFlusher, format common.EntryFormat, flushable bool) (code common.ErrCode, err error) { +func (e Entry) Marshal(w io.Writer, format common.EntryFormat) (code common.ErrCode, err error) { switch format { case common.EntryV1: - return e.marshalV1(w, flushable) + return e.marshalV1(w) default: return common.EntryUnsupportedFormat, common.ErrEntryUnsupportedFormat @@ -43,15 +37,11 @@ func (e Entry) Marshal(w WriteFlusher, format common.EntryFormat, flushable bool } // [Length - uint32][Checksum - uint32][Payload - bytes] -func (e Entry) marshalV1(w WriteFlusher, flushable bool) (code common.ErrCode, err error) { +func (e Entry) marshalV1(w io.Writer) (code common.ErrCode, err error) { var buf [8]byte common.Endianese.PutUint64(buf[:], uint64(len(e))<<32|uint64(crc32.ChecksumIEEE(e))) if _, err = w.Write(buf[:]); err == nil { - if _, err = w.Write(e); err == nil { - if flushable { - err = w.Flush() - } - } + _, err = w.Write(e) } if err != nil { @@ -176,19 +166,13 @@ func (b *Batch) Append(e Entry) { } // Marshal into writer. -func (b *Batch) Marshal(w WriteFlusher, format common.EntryFormat) (code common.ErrCode, err error) { +func (b *Batch) Marshal(w io.Writer, format common.EntryFormat) (code common.ErrCode, err error) { if b.Len() > 0 { for _, e := range b.entries { - if code, err = e.Marshal(w, format, false); err != nil { + if code, err = e.Marshal(w, format); err != nil { return code, err } } - - if err = w.Flush(); err != nil { - code = common.EntryWriteErr - } else { - code = common.NoError - } } return } diff --git a/entry/entry_test.go b/entry/entry_test.go index 5c829b4..3d01887 100644 --- a/entry/entry_test.go +++ b/entry/entry_test.go @@ -125,12 +125,6 @@ func (e *errorWriter) Write([]byte) (int, error) { return 0, fmt.Errorf("fake er func (e *errorWriter) Flush() error { return nil } -type errorFlusher struct{} - -func (e *errorFlusher) Write([]byte) (int, error) { return 0, nil } - -func (e *errorFlusher) Flush() error { return fmt.Errorf("fake error") } - type noopFlusher struct{ io.Writer } func (f *noopFlusher) Flush() error { return nil } @@ -139,7 +133,7 @@ func TestEntryMarshal(t *testing.T) { t.Run("UnsupportedFormat", func(t *testing.T) { var e Entry = []byte{1, 2, 3, 4} - code, err := e.Marshal(nil, 123, true) + code, err := e.Marshal(nil, 123) require.Equal(t, common.ErrEntryUnsupportedFormat, err) require.Equal(t, common.EntryUnsupportedFormat, code) }) @@ -147,17 +141,13 @@ func TestEntryMarshal(t *testing.T) { t.Run("ErrorHandling", func(t *testing.T) { var e Entry = []byte{1, 2, 3, 4} - code, err := e.Marshal(&errorWriter{}, common.EntryV1, true) + code, err := e.Marshal(&errorWriter{}, common.EntryV1) require.Equal(t, common.EntryWriteErr, code) require.Error(t, err) batch := NewBatch(2) batch.Append(e) - code, err = batch.Marshal(&errorFlusher{}, common.EntryV1) - require.Equal(t, common.EntryWriteErr, code) - require.Error(t, err) - code, err = batch.Marshal(&errorWriter{}, common.EntryV1) require.Equal(t, common.EntryWriteErr, code) require.Error(t, err) @@ -167,7 +157,7 @@ func TestEntryMarshal(t *testing.T) { var e Entry = []byte{1, 2, 3, 4} var buf bytes.Buffer - code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1, true) + code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1) require.NoError(t, err) require.Equal(t, code, common.NoError) require.EqualValues(t, []byte{0, 0, 0, 4, 0xb6, 0x3c, 0xfb, 0xcd, 1, 2, 3, 4}, buf.Bytes()) @@ -201,7 +191,7 @@ func TestEntry(t *testing.T) { var buf bytes.Buffer - code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1, true) + code, err := e.Marshal(&noopFlusher{Writer: &buf}, common.EntryV1) require.NoError(t, err) require.Equal(t, common.NoError, code) diff --git a/go.mod b/go.mod index e6e87fb..30ab1a8 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,10 @@ require ( github.com/grandecola/mmap v0.6.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/kr/pretty v0.2.0 // indirect + github.com/kr/pty v1.1.1 // indirect + github.com/kr/text v0.1.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/objx v0.1.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/queue_test.go b/queue_test.go index 0efdfa9..f1a714c 100644 --- a/queue_test.go +++ b/queue_test.go @@ -148,7 +148,7 @@ func TestQueueRace(t *testing.T) { _ = q.Close() }() - // start reader + // start readers var wg sync.WaitGroup collectValue := make([]int, size) @@ -195,6 +195,7 @@ func TestQueueRace(t *testing.T) { }() } + // start writers ch := make(chan uint32, 1) for i := 0; i < 4; i++ { wg.Add(1) diff --git a/segment/v1/segment.go b/segment/v1/segment.go index e1e5f24..8ad00d6 100644 --- a/segment/v1/segment.go +++ b/segment/v1/segment.go @@ -2,7 +2,6 @@ package segv1 import ( "io" - "sync" "sync/atomic" "github.com/linxGnu/pqueue/common" @@ -16,15 +15,12 @@ type Segment struct { readOnly bool entryFormat common.EntryFormat + w entry.Writer - r entry.Reader - rLock sync.Mutex offset uint32 numEntries uint32 maxEntries uint32 - - w entry.Writer - wLock sync.Mutex + r entry.Reader } // NewReadOnlySegment creates new Segment for readonly. @@ -122,9 +118,6 @@ func (s *Segment) WriteEntry(e entry.Entry) (common.ErrCode, error) { } func (s *Segment) writeEntry(e entry.Entry) (common.ErrCode, error) { - s.wLock.Lock() - defer s.wLock.Unlock() - if s.numEntries >= s.maxEntries { return common.SegmentNoMoreWrite, nil } @@ -148,9 +141,6 @@ func (s *Segment) WriteBatch(b entry.Batch) (common.ErrCode, error) { } func (s *Segment) writeBatch(b entry.Batch) (common.ErrCode, error) { - s.wLock.Lock() - defer s.wLock.Unlock() - if s.numEntries >= s.maxEntries { return common.SegmentNoMoreWrite, nil } @@ -166,9 +156,6 @@ func (s *Segment) writeBatch(b entry.Batch) (common.ErrCode, error) { // ReadEntry from segment. func (s *Segment) ReadEntry(e *entry.Entry) (common.ErrCode, int, error) { - s.rLock.Lock() - defer s.rLock.Unlock() - if !s.readOnly { // readable? if s.offset == atomic.LoadUint32(&s.numEntries) { diff --git a/segment/v1/segment_test.go b/segment/v1/segment_test.go index da1d6ba..c72fdf1 100644 --- a/segment/v1/segment_test.go +++ b/segment/v1/segment_test.go @@ -2,11 +2,7 @@ package segv1 import ( "bytes" - "os" - "path/filepath" - "sync" "testing" - "time" "github.com/linxGnu/pqueue/common" "github.com/linxGnu/pqueue/entry" @@ -14,8 +10,6 @@ import ( "github.com/stretchr/testify/require" ) -var tmpDir = os.TempDir() - func TestSegment(t *testing.T) { t.Run("NewSegmentFailure", func(t *testing.T) { { @@ -240,88 +234,88 @@ func TestNewSegmentReadWrite(t *testing.T) { }) } -func TestSegmentRace(t *testing.T) { - size := 20000 - - // prepare temp file - tmpFile := filepath.Join(tmpDir, "segment.tmp") - - // create/trunc it - f, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) - require.NoError(t, err) - - // remove when done - defer func() { _ = os.Remove(tmpFile) }() - - // open temp file for reading - fr, err := os.Open(tmpFile) - require.NoError(t, err) - - // create new segment - s, err := NewSegment(f, common.EntryV1, uint32(size)) - require.NoError(t, err) - n, err := s.Reading(fr) - require.NoError(t, err) - require.Equal(t, 4, n) - defer func() { - _ = s.Close() - }() - - // start reader - var wg sync.WaitGroup - - collectValue := make([]int, size) - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - var e entry.Entry - for { - code, _, err := s.ReadEntry(&e) - if code == common.SegmentNoMoreReadStrong { - return - } - if code == common.SegmentNoMoreReadWeak { - time.Sleep(500 * time.Microsecond) - } else { - require.Equal(t, common.NoError, code) - require.NoError(t, err) - - value := common.Endianese.Uint32(e) - require.Less(t, value, uint32(size)) - collectValue[value]++ - } - } - }() - } - - ch := make(chan uint32, 1) - for i := 0; i < 8; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - var buf [4]byte - for data := range ch { - time.Sleep(time.Millisecond) - - common.Endianese.PutUint32(buf[:], data) - code, err := s.WriteEntry(buf[:]) - require.NoError(t, err) - require.Equal(t, common.NoError, code) - } - }() - } - - for i := 0; i < size; i++ { - ch <- uint32(i) - } - close(ch) - - wg.Wait() - - for i := range collectValue { - require.Equal(t, 1, collectValue[i]) - } -} +// func TestSegmentRace(t *testing.T) { +// size := 20000 +// +// // prepare temp file +// tmpFile := filepath.Join(tmpDir, "segment.tmp") +// +// // create/trunc it +// f, err := os.OpenFile(tmpFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) +// require.NoError(t, err) +// +// // remove when done +// defer func() { _ = os.Remove(tmpFile) }() +// +// // open temp file for reading +// fr, err := os.Open(tmpFile) +// require.NoError(t, err) +// +// // create new segment +// s, err := NewSegment(f, common.EntryV1, uint32(size)) +// require.NoError(t, err) +// n, err := s.Reading(fr) +// require.NoError(t, err) +// require.Equal(t, 4, n) +// defer func() { +// _ = s.Close() +// }() +// +// // start reader +// var wg sync.WaitGroup +// +// collectValue := make([]int, size) +// for i := 0; i < 8; i++ { +// wg.Add(1) +// go func() { +// defer wg.Done() +// +// var e entry.Entry +// for { +// code, _, err := s.ReadEntry(&e) +// if code == common.SegmentNoMoreReadStrong { +// return +// } +// if code == common.SegmentNoMoreReadWeak { +// time.Sleep(500 * time.Microsecond) +// } else { +// require.Equal(t, common.NoError, code) +// require.NoError(t, err) +// +// value := common.Endianese.Uint32(e) +// require.Less(t, value, uint32(size)) +// collectValue[value]++ +// } +// } +// }() +// } +// +// ch := make(chan uint32, 1) +// for i := 0; i < 8; i++ { +// wg.Add(1) +// go func() { +// defer wg.Done() +// +// var buf [4]byte +// for data := range ch { +// time.Sleep(time.Millisecond) +// +// common.Endianese.PutUint32(buf[:], data) +// code, err := s.WriteEntry(buf[:]) +// require.NoError(t, err) +// require.Equal(t, common.NoError, code) +// } +// }() +// } +// +// for i := 0; i < size; i++ { +// ch <- uint32(i) +// } +// close(ch) +// +// wg.Wait() +// +// for i := range collectValue { +// require.Equal(t, 1, collectValue[i]) +// } +// } diff --git a/segment/v1/segment_writer.go b/segment/v1/segment_writer.go index d59ae01..031d189 100644 --- a/segment/v1/segment_writer.go +++ b/segment/v1/segment_writer.go @@ -10,9 +10,7 @@ import ( "github.com/hashicorp/go-multierror" ) -var ( - segmentEnding = []byte{0, 0, 0, 0, 0, 0, 0, 0} -) +var segmentEnding = []byte{0, 0, 0, 0, 0, 0, 0, 0} type segmentWriter struct { w *bufio.Writer @@ -36,8 +34,10 @@ func (s *segmentWriter) Close() (err error) { // WriteEntry to underlying writer. func (s *segmentWriter) WriteEntry(e entry.Entry) (common.ErrCode, error) { - // check size - _, err := e.Marshal(s.w, s.entryFormat, true) + _, err := e.Marshal(s.w, s.entryFormat) + if err == nil { + err = s.w.Flush() + } if err == nil { return common.NoError, nil } @@ -47,6 +47,9 @@ func (s *segmentWriter) WriteEntry(e entry.Entry) (common.ErrCode, error) { // WriteEntry to underlying writer. func (s *segmentWriter) WriteBatch(b entry.Batch) (common.ErrCode, error) { _, err := b.Marshal(s.w, s.entryFormat) + if err == nil { + err = s.w.Flush() + } if err == nil { return common.NoError, nil }