Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

csbufio: Respect contexts in NewReader/NewWriter (lytics/lio#28503) #100

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion awss3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (f *FS) NewWriterWithContext(ctx context.Context, objectName string, metada
uploader := s3manager.NewUploader(f.sess)

pr, pw := io.Pipe()
bw := csbufio.NewWriter(pw)
bw := csbufio.NewWriter(ctx, pw)

go func() {
// TODO: this needs to be managed, ie shutdown signals, close, handler err etc.
Expand Down
28 changes: 21 additions & 7 deletions csbufio/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,39 @@ package csbufio

import (
"bufio"
"context"
"io"
"os"
)

func OpenReader(name string) (io.ReadCloser, error) {
func OpenReader(ctx context.Context, name string) (io.ReadCloser, error) {
f, err := os.Open(name)
if err != nil {
return nil, err
}
return NewReader(f), nil
return NewReader(ctx, f), nil
}

func NewReader(rc io.ReadCloser) io.ReadCloser {
return bufReadCloser{bufio.NewReader(rc), rc}
func NewReader(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
return &bufReadCloser{ctx, bufio.NewReader(rc), rc}
}

type bufReadCloser struct {
io.Reader
c io.Closer
ctx context.Context
r io.Reader
c io.Closer
}

func (bc bufReadCloser) Close() error { return bc.c.Close() }
func (b *bufReadCloser) Read(p []byte) (int, error) {
if err := b.ctx.Err(); err != nil {
return 0, err
}
return b.r.Read(p)
}

func (b *bufReadCloser) Close() error {
if err := b.ctx.Err(); err != nil {
return err
}
return b.c.Close()
}
47 changes: 47 additions & 0 deletions csbufio/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package csbufio

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestReaderContextDone(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
cancel()

m := memRWC([]byte("some-data"))
rc := NewReader(ctx, &m)

var p []byte
n, err := rc.Read(p)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, 0, n)
require.Len(t, p, 0)

err = rc.Close()
require.ErrorIs(t, err, context.Canceled)
}

type memRWC []byte

func (m memRWC) Read(p []byte) (int, error) {
n := len(p)
if n > len(m) {
n = len(m)
}
copy(p, m)
return n, nil
}

func (m *memRWC) Write(p []byte) (int, error) {
*m = append(*m, p...)
return len(p), nil
}

func (m memRWC) Close() error {
return nil
}
36 changes: 23 additions & 13 deletions csbufio/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,43 @@ package csbufio

import (
"bufio"
"context"
"io"
"os"
)

type (
bufWriteCloser struct {
*bufio.Writer
c io.Closer
}
)
type bufWriteCloser struct {
ctx context.Context
w *bufio.Writer
c io.Closer
}

func OpenWriter(name string) (io.WriteCloser, error) {
func OpenWriter(ctx context.Context, name string) (io.WriteCloser, error) {
f, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE, 0665)
if err != nil {
return nil, err
}
return NewWriter(f), nil
return NewWriter(ctx, f), nil
}

// NewWriter is a io.WriteCloser.
func NewWriter(rc io.WriteCloser) io.WriteCloser {
return bufWriteCloser{bufio.NewWriter(rc), rc}
func NewWriter(ctx context.Context, rc io.WriteCloser) io.WriteCloser {
return &bufWriteCloser{ctx, bufio.NewWriter(rc), rc}
}

func (b *bufWriteCloser) Write(p []byte) (int, error) {
if err := b.ctx.Err(); err != nil {
return 0, err
}
return b.w.Write(p)
}

func (bc bufWriteCloser) Close() error {
if err := bc.Flush(); err != nil {
func (b *bufWriteCloser) Close() error {
if err := b.ctx.Err(); err != nil {
return err
}
if err := b.w.Flush(); err != nil {
return err
}
return bc.c.Close()
return b.c.Close()
}
26 changes: 26 additions & 0 deletions csbufio/writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package csbufio

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestWriterContextDone(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
cancel()

var m memRWC
wc := NewWriter(ctx, &m)

n, err := wc.Write([]byte("some-data"))
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, 0, n)
require.Len(t, m, 0)

err = wc.Close()
require.ErrorIs(t, err, context.Canceled)
}
44 changes: 38 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,22 +1,54 @@
module github.com/lytics/cloudstorage

go 1.15
go 1.18

require (
cloud.google.com/go/storage v1.15.0
github.com/Azure/azure-sdk-for-go v40.5.0+incompatible
github.com/Azure/go-autorest/autorest v0.11.10 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/araddon/gou v0.0.0-20190110011759-c797efecbb61
github.com/aws/aws-sdk-go v1.29.34
github.com/dnaeon/go-vcr v1.1.0 // indirect
github.com/pborman/uuid v1.2.1
github.com/pkg/sftp v1.11.0
github.com/satori/go.uuid v1.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.8.0
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4
golang.org/x/oauth2 v0.0.0-20210413134643-5e61552d6c78
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/api v0.45.0
)

require (
cloud.google.com/go v0.81.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.10 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.5 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/logger v0.2.0 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dnaeon/go-vcr v1.1.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/satori/go.uuid v1.2.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/sys v0.0.0-20210412220455-f1c623a9e750 // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/tools v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210420162539-3c870d7478d2 // indirect
google.golang.org/grpc v1.37.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -188,10 +189,13 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -521,8 +525,9 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
6 changes: 3 additions & 3 deletions localfs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,13 @@ func (l *LocalStore) NewReaderWithContext(ctx context.Context, o string) (io.Rea
if err != nil {
return nil, err
}
return csbufio.OpenReader(fo)
return csbufio.OpenReader(ctx, fo)
}

func (l *LocalStore) NewWriter(o string, metadata map[string]string) (io.WriteCloser, error) {
return l.NewWriterWithContext(context.Background(), o, metadata)
}
func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadata map[string]string, opts ...cloudstorage.Opts) (io.WriteCloser, error) {

fo := path.Join(l.storepath, o)

err := cloudstorage.EnsureDir(fo)
Expand All @@ -269,7 +268,8 @@ func (l *LocalStore) NewWriterWithContext(ctx context.Context, o string, metadat
if err != nil {
return nil, err
}
return csbufio.NewWriter(f), nil

return csbufio.NewWriter(ctx, f), nil
}

func (l *LocalStore) Get(ctx context.Context, o string) (cloudstorage.Object, error) {
Expand Down