Skip to content

Commit

Permalink
Merge pull request #3 from helinwang/master
Browse files Browse the repository at this point in the history
able to get chunk Index from Index, fix writer
  • Loading branch information
wangkuiyi authored May 24, 2017
2 parents 19cc0e7 + a5f5c59 commit 1065839
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 76 deletions.
13 changes: 11 additions & 2 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,21 @@ func (ch *Chunk) dump(w io.Writer, compressorIndex int) error {
return nil
}

type noopCompressor struct {
*bytes.Buffer
}

func (c *noopCompressor) Close() error {
return nil
}

func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
compressed := new(bytes.Buffer)
var compressor io.Writer
var compressor io.WriteCloser

switch compressorIndex {
case NoCompression:
compressor = compressed
compressor = &noopCompressor{compressed}
case Snappy:
compressor = snappy.NewBufferedWriter(compressed)
case Gzip:
Expand All @@ -94,6 +102,7 @@ func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
if _, e := io.Copy(compressor, src); e != nil {
return nil, fmt.Errorf("Failed to compress chunk data: %v", e)
}
compressor.Close()

return compressed, nil
}
Expand Down
5 changes: 3 additions & 2 deletions header.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
)

const (
magicNumber uint32 = 0x01020304

// NoCompression means writing raw chunk data into files.
// With other choices, chunks are compressed before written.
NoCompression = iota
Expand All @@ -19,6 +17,9 @@ const (
// Gzip is a well-known compression algorithm. It is
// recommmended only you are looking for compression ratio.
Gzip

magicNumber uint32 = 0x01020304
defaultCompressor = Snappy
)

// Header is the metadata of Chunk.
Expand Down
19 changes: 18 additions & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import "io"
type Index struct {
chunkOffsets []int64
chunkLens []uint32
numRecords int // the number of all records in a file.
numRecords int // the number of all records in a file.
chunkRecords []int // the number of records in chunks.
}

// LoadIndex scans the file and parse chunkOffsets, chunkLens, and len.
Expand All @@ -24,6 +25,7 @@ func LoadIndex(r io.ReadSeeker) (*Index, error) {

f.chunkOffsets = append(f.chunkOffsets, offset)
f.chunkLens = append(f.chunkLens, hdr.numRecords)
f.chunkRecords = append(f.chunkRecords, int(hdr.numRecords))
f.numRecords += int(hdr.numRecords)

offset, e = r.Seek(int64(hdr.compressedSize), io.SeekCurrent)
Expand All @@ -43,6 +45,21 @@ func (r *Index) NumRecords() int {
return r.numRecords
}

// NumChunks returns the total number of chunks in a RecordIO file.
func (r *Index) NumChunks() int {
return len(r.chunkLens)
}

// ChunkIndex return the Index of i-th Chunk.
func (r *Index) ChunkIndex(i int) *Index {
idx := &Index{}
idx.chunkOffsets = []int64{r.chunkOffsets[i]}
idx.chunkLens = []uint32{r.chunkLens[i]}
idx.chunkRecords = []int{r.chunkRecords[i]}
idx.numRecords = idx.chunkRecords[0]
return idx
}

// Locate returns the index of chunk that contains the given record,
// and the record index within the chunk. It returns (-1, -1) if the
// record is out of range.
Expand Down
90 changes: 90 additions & 0 deletions recordio_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package recordio

import (
"bytes"
"testing"
"unsafe"

"github.com/stretchr/testify/assert"
)

func TestChunkHead(t *testing.T) {
assert := assert.New(t)

c := &Header{
checkSum: 123,
compressor: 456,
compressedSize: 789,
}

var buf bytes.Buffer
_, e := c.write(&buf)
assert.Nil(e)

cc, e := parseHeader(&buf)
assert.Nil(e)
assert.Equal(c, cc)
}

func TestWriteAndRead(t *testing.T) {
assert := assert.New(t)

data := []string{
"12345",
"1234",
"12"}

var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.

n, e := w.Write([]byte(data[0])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(5, n)

n, e = w.Write([]byte(data[1])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(4, n)

n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk.
assert.Nil(e)
assert.Equal(n, 2)

assert.Nil(w.Close()) // flush the second chunk.
assert.Nil(w.Writer)

n, e = w.Write([]byte("anything")) // not effective after close.
assert.NotNil(e)
assert.Equal(n, 0)

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal([]uint32{2, 1}, idx.chunkLens)
assert.Equal(
[]int64{0,
int64(4 + // magic number
unsafe.Sizeof(Header{}) +
5 + // first record
4 + // second record
2*4)}, // two record legnths
idx.chunkOffsets)

s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0
for s.Scan() {
assert.Equal(data[i], string(s.Record()))
i++
}
}

func TestWriteEmptyFile(t *testing.T) {
assert := assert.New(t)

var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.
assert.Nil(w.Close())
assert.Equal(0, buf.Len())

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal(0, idx.NumRecords())
}
131 changes: 61 additions & 70 deletions recordio_test.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,81 @@
package recordio
package recordio_test

import (
"bytes"
"reflect"
"testing"
"unsafe"

"github.com/stretchr/testify/assert"
"github.com/wangkuiyi/recordio"
)

func TestChunkHead(t *testing.T) {
assert := assert.New(t)

c := &Header{
checkSum: 123,
compressor: 456,
compressedSize: 789,
}

func TestWriteRead(t *testing.T) {
const total = 1000
var buf bytes.Buffer
_, e := c.write(&buf)
assert.Nil(e)

cc, e := parseHeader(&buf)
assert.Nil(e)
assert.Equal(c, cc)
}

func TestWriteAndRead(t *testing.T) {
assert := assert.New(t)

data := []string{
"12345",
"1234",
"12"}

var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.

n, e := w.Write([]byte(data[0])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(5, n)

n, e = w.Write([]byte(data[1])) // not exceed chunk size.
assert.Nil(e)
assert.Equal(4, n)

n, e = w.Write([]byte(data[2])) // exeeds chunk size, dump and create a new chunk.
assert.Nil(e)
assert.Equal(n, 2)

assert.Nil(w.Close()) // flush the second chunk.
assert.Nil(w.Writer)
w := recordio.NewWriter(&buf, 0, -1)
for i := 0; i < total; i++ {
_, err := w.Write(make([]byte, i))
if err != nil {
t.Fatal(err)
}
}
w.Close()

n, e = w.Write([]byte("anything")) // not effective after close.
assert.NotNil(e)
assert.Equal(n, 0)
idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatal(err)
}

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal([]uint32{2, 1}, idx.chunkLens)
assert.Equal(
[]int64{0,
int64(4 + // magic number
unsafe.Sizeof(Header{}) +
5 + // first record
4 + // second record
2*4)}, // two record legnths
idx.chunkOffsets)
if idx.NumRecords() != total {
t.Fatal("num record does not match:", idx.NumRecords(), total)
}

s := NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), idx, -1, -1)
i := 0
for s.Scan() {
assert.Equal(data[i], string(s.Record()))
if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
t.Fatal("not equal:", len(s.Record()), len(make([]byte, i)))
}
i++
}
}

func TestWriteEmptyFile(t *testing.T) {
assert := assert.New(t)
if i != total {
t.Fatal("total count not match:", i, total)
}
}

func TestChunkIndex(t *testing.T) {
const total = 1000
var buf bytes.Buffer
w := NewWriter(&buf, 10, NoCompression) // use a small maxChunkSize.
assert.Nil(w.Close())
assert.Equal(0, buf.Len())
w := recordio.NewWriter(&buf, 0, -1)
for i := 0; i < total; i++ {
_, err := w.Write(make([]byte, i))
if err != nil {
t.Fatal(err)
}
}
w.Close()

idx, err := recordio.LoadIndex(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatal(err)
}

idx, e := LoadIndex(bytes.NewReader(buf.Bytes()))
assert.Nil(e)
assert.Equal(0, idx.NumRecords())
if idx.NumChunks() != total {
t.Fatal("unexpected chunk num:", idx.NumChunks(), total)
}

for i := 0; i < total; i++ {
newIdx := idx.ChunkIndex(i)
s := recordio.NewScanner(bytes.NewReader(buf.Bytes()), newIdx, -1, -1)
j := 0
for s.Scan() {
if !reflect.DeepEqual(s.Record(), make([]byte, i)) {
t.Fatal("not equal:", len(s.Record()), len(make([]byte, i)))
}
j++
}
if j != 1 {
t.Fatal("unexpected record per chunk:", j)
}
}
}
7 changes: 6 additions & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ type Writer struct {
// using the deflate algorithm given compression level. Note that
// level 0 means no compression and -1 means default compression.
func NewWriter(w io.Writer, maxChunkSize, compressor int) *Writer {
if maxChunkSize <= 0 {
if maxChunkSize < 0 {
maxChunkSize = defaultMaxChunkSize
}

if compressor < 0 {
compressor = defaultCompressor
}

return &Writer{
Writer: w,
chunk: &Chunk{},
Expand Down

0 comments on commit 1065839

Please sign in to comment.