Skip to content

Commit

Permalink
feat: add parquet basic support
Browse files Browse the repository at this point in the history
  • Loading branch information
GalvinGao committed Dec 1, 2023
1 parent 7f21565 commit 96bc84d
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions internal/pkg/archiver/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

const (
FileExt = ".jsonl.gz"
FileExtJsonlGzip = ".jsonl.gz"
FileExtParquet = ".parquet"
LocalTempDirPattern = "penguin_stats-archiver-*"
ArchiverChanBufferSize = 1000
)
Expand Down Expand Up @@ -53,10 +54,10 @@ func (a *Archiver) initLogger() {
}
}

func (a *Archiver) canonicalFilePath() string {
func (a *Archiver) canonicalFilePath(fileExt string) string {
loc := constant.LocMap["CN"] // we use CN server's day start time as the day start time for all servers for archive
localT := a.date.In(loc)
return a.RealmName + "/" + a.RealmName + "_" + localT.Format("2006-01-02") + FileExt
return a.RealmName + "/" + a.RealmName + "_" + localT.Format("2006-01-02") + fileExt
}

func (a *Archiver) Prepare(ctx context.Context, date time.Time) error {
Expand All @@ -75,7 +76,7 @@ func (a *Archiver) Prepare(ctx context.Context, date time.Time) error {
}
a.logger.Debug().
Str("evt.name", "archiver.prepare.assertFileNonExistence").
Str("canonicalFilePath", a.canonicalFilePath()).
Str("canonicalFilePath", a.canonicalFilePath(FileExtJsonlGzip)).
Msg("asserted S3 file non-existence")

if err := a.createLocalTempDir(); err != nil {
Expand All @@ -90,7 +91,7 @@ func (a *Archiver) Prepare(ctx context.Context, date time.Time) error {
}

func (a *Archiver) assertS3FileNonExistence(ctx context.Context) error {
key := a.S3Prefix + a.canonicalFilePath()
key := a.S3Prefix + a.canonicalFilePath(FileExtJsonlGzip)
input := &s3.HeadObjectInput{
Bucket: aws.String(a.S3Bucket),
Key: aws.String(key),
Expand Down Expand Up @@ -160,37 +161,54 @@ func (a *Archiver) Collect(ctx context.Context) error {
}

func (a *Archiver) archiveToLocalFile(ctx context.Context) error {
localTempFilePath := path.Join(a.localTempDir, a.canonicalFilePath())
jsonlGzipCh := make(chan any, ArchiverChanBufferSize)
go a.archiveToLocalJsonlGzipFile(ctx, jsonlGzipCh)

// parquetCh := make(chan any, ArchiverChanBufferSize)
// go a.archiveToLocalParquetFile(ctx, parquetCh)

for item := range a.writerCh {
jsonlGzipCh <- item
// parquetCh <- item
}
close(jsonlGzipCh)
// close(parquetCh)

return nil
}

func (a *Archiver) archiveToLocalJsonlGzipFile(ctx context.Context, itemCh <-chan any) error {
localTempFilePath := path.Join(a.localTempDir, a.canonicalFilePath(FileExtJsonlGzip))
if err := a.ensureFileBaseDir(localTempFilePath); err != nil {
return errors.Wrap(err, "failed to ensureFileBaseDir")
}
a.logger.Debug().
Str("evt.name", "archiver.collect.archiveToLocalFile.ensureFileBaseDir").
Str("localTempFilePath", localTempFilePath).Msg("ensured file base dir")

file, err := os.OpenFile(localTempFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
jsonFile, err := os.OpenFile(localTempFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
if err != nil {
return errors.Wrap(err, "failed to open file")
}
defer file.Close()
defer jsonFile.Close()
a.logger.Debug().
Str("evt.name", "archiver.collect.archiveToLocalFile.openFile").
Str("localTempFilePath", localTempFilePath).Msg("opened file, ready to write gzip stream")

gzipWriter := gzip.NewWriter(file)
defer gzipWriter.Close()
jsonGzipWriter := gzip.NewWriter(jsonFile)
defer jsonGzipWriter.Close()

jsonEncoder := json.NewEncoder(gzipWriter)
jsonEncoder := json.NewEncoder(jsonGzipWriter)

for {
select {
case <-ctx.Done():
return nil
case item, ok := <-a.writerCh:
case item, ok := <-itemCh:
if !ok {
a.logger.Debug().
Str("evt.name", "archiver.collect.archiveToLocalFile.writerChClosed").
Msg("writerCh closed, exiting archiveToLocalFile (closing gzipWriter and file)")
Str("evt.name", "archiver.collect.archiveToLocalFile.itemChClosed").
Msg("itemCh closed, exiting archiveToLocalFile (closing gzipWriter and file)")
return nil
}
if err := jsonEncoder.Encode(item); err != nil {
Expand All @@ -201,14 +219,14 @@ func (a *Archiver) archiveToLocalFile(ctx context.Context) error {
}

func (a *Archiver) uploadToS3(ctx context.Context) error {
localTempFilePath := path.Join(a.localTempDir, a.canonicalFilePath())
localTempFilePath := path.Join(a.localTempDir, a.canonicalFilePath(FileExtJsonlGzip))
file, err := os.Open(localTempFilePath)
if err != nil {
return errors.Wrap(err, "failed to open file")
}
defer file.Close()

key := a.S3Prefix + a.canonicalFilePath()
key := a.S3Prefix + a.canonicalFilePath(FileExtJsonlGzip)
if _, err := a.S3Client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(a.S3Bucket),
Key: aws.String(key),
Expand Down

0 comments on commit 96bc84d

Please sign in to comment.