Skip to content

Commit

Permalink
Improve handling of a trickle of data in tyger buffer write (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
johnstairs authored Dec 2, 2024
1 parent cb56a32 commit 684a95c
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .notice-metadata.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
1e2a80a6c0bbc4591f902da90005b5953c0a3cf84787ed9540732a033ee0f054 cli/go.sum
c51d84d251b17e9f89ff1a4b791a3f0e11598c9f31fa1e2f4117cb75674c070e cli/go.sum
33d7431ce175376b59e4634781746a75f1599aef8251b1153dc4d7484e67b745 server/ControlPlane/packages.lock.json
a25dbcbbf4137b6c2a0e2140b940cbb68a8f9624af487a38934918088795b7bd server/DataPlane/packages.lock.json
e518c00c2e96699b9cf588fe7504950aa2ca33a0ed1d7e53e0bb2b5b0dbce2c4 scripts/generate-notice.sh
Expand Down
1 change: 1 addition & 0 deletions cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/psanford/memfs v0.0.0-20230130182539-4dbf7e3e865e
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
github.com/sunshineplan/limiter v1.0.0
go.opentelemetry.io/otel v1.28.0
go.uber.org/ratelimit v0.3.1
golang.org/x/sync v0.8.0
Expand Down
2 changes: 2 additions & 0 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/sunshineplan/limiter v1.0.0 h1:wx3q5eS5J+ggXlAxzg9k6UbDyJYrysNmHyxt5cNmCP8=
github.com/sunshineplan/limiter v1.0.0/go.mod h1:+Pjd5Pu7i5YclrnFz+MBFxGB9+MZ2cytQeV+S9kXOxY=
github.com/thediveo/enumflag v0.10.1 h1:DB3Ag69VZ7BCv6jzKECrZ0ebZrHLzFRMIFYt96s4OxM=
github.com/thediveo/enumflag v0.10.1/go.mod h1:KyVhQUPzreSw85oJi2uSjFM0ODLKXBH0rPod7zc2pmI=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down
64 changes: 64 additions & 0 deletions cli/integrationtest/dataplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package integrationtest

import (
"bufio"
"bytes"
"context"
"crypto/md5"
Expand Down Expand Up @@ -75,6 +76,69 @@ func TestReadingWhileWriting(t *testing.T) {
assert.Equal(t, inputHasher.Sum(nil), outputHasher.Sum(nil), "hashes do not match")
}

func TestTickleLatencyWithFlushInterval(t *testing.T) {
t.Parallel()

bufferName := runTygerSucceeds(t, "buffer", "create")
writeSasUri := runTygerSucceeds(t, "buffer", "access", bufferName, "--write")
readSasUri := runTygerSucceeds(t, "buffer", "access", bufferName)

// start the read process
readCommand := exec.Command("tyger", "buffer", "read", readSasUri)
outputReader, err := readCommand.StdoutPipe()
require.NoError(t, err)
readStdErr := &bytes.Buffer{}
readCommand.Stderr = readStdErr

assert.NoError(t, readCommand.Start(), "read command failed to start")

// start the write process
writeCommand := exec.Command("tyger", "buffer", "write", writeSasUri, "--flush-interval", "1s")
inputWriter, err := writeCommand.StdinPipe()
require.NoError(t, err)

writeStdErr := &bytes.Buffer{}
writeCommand.Stderr = writeStdErr

linesWritten := 0

go func() {
defer inputWriter.Close()
start := time.Now()
end := start.Add(15 * time.Second)
for now := start; now.Compare(end) < 0; now = time.Now() {
_, err := inputWriter.Write([]byte(fmt.Sprintf("%s\n", now.Format(time.RFC3339Nano))))
require.NoError(t, err)
linesWritten++
time.Sleep(10 * time.Millisecond)
}
}()

writeCommandErrChan := make(chan error)
go func() {
writeCommandErrChan <- writeCommand.Run()
}()

linesRead := 0
// read the output line by line
scanner := bufio.NewScanner(outputReader)
for scanner.Scan() {
line := scanner.Text()
parsedTime, err := time.Parse(time.RFC3339Nano, line)
require.NoError(t, err)
require.WithinDuration(t, time.Now(), parsedTime, 5*time.Second)
linesRead++
}

t.Log(writeStdErr.String())

assert.NoError(t, <-writeCommandErrChan, "write command failed")

assert.Nil(t, readCommand.Wait(), "read command failed")
require.Equal(t, linesWritten, linesRead, "number of lines written and read do not match")
t.Log(readStdErr.String())
}

func TestAccessStringIsFile(t *testing.T) {
t.Parallel()

Expand Down
19 changes: 19 additions & 0 deletions cli/internal/cmd/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os/signal"
"strconv"
"syscall"
"time"

"github.com/alecthomas/units"
"github.com/microsoft/tyger/cli/internal/client"
Expand Down Expand Up @@ -329,6 +330,7 @@ func NewBufferWriteCommand(openFileFunc func(name string, flag int, perm fs.File
inputFilePath := ""
dop := dataplane.DefaultWriteDop
blockSizeString := ""
flushIntervalString := dataplane.DefaultFlushInterval.String()

cmd := &cobra.Command{
Use: "write { BUFFER_ID | BUFFER_SAS_URI | FILE_WITH_SAS_URI } [flags]",
Expand Down Expand Up @@ -367,6 +369,11 @@ func NewBufferWriteCommand(openFileFunc func(name string, flag int, perm fs.File
}
} else {
defer inputFile.Close()
if fileInfo, err := inputFile.Stat(); err == nil && fileInfo.Mode().IsRegular() {
// in input file is a regular file, so disable periodic flushing
flushIntervalString = ""
}

inputReader = inputFile
}
} else {
Expand All @@ -393,6 +400,17 @@ func NewBufferWriteCommand(openFileFunc func(name string, flag int, perm fs.File
writeOptions = append(writeOptions, dataplane.WithWriteBlockSize(int(parsedBlockSize)))
}

var parsedFlushInterval time.Duration
if flushIntervalString != "" {
var err error
parsedFlushInterval, err = time.ParseDuration(flushIntervalString)
if err != nil {
log.Fatal().Err(err).Msg("Invalid flush interval")
}
}

writeOptions = append(writeOptions, dataplane.WithWriteFlushInterval(parsedFlushInterval))

err = dataplane.Write(ctx, uri, inputReader, writeOptions...)
if err != nil {
if errors.Is(err, ctx.Err()) {
Expand All @@ -406,6 +424,7 @@ func NewBufferWriteCommand(openFileFunc func(name string, flag int, perm fs.File
cmd.Flags().StringVarP(&inputFilePath, "input", "i", inputFilePath, "The file to read from. If not specified, data is read from standard in.")
cmd.Flags().IntVarP(&dop, "dop", "p", dop, "The degree of parallelism")
cmd.Flags().StringVarP(&blockSizeString, "block-size", "b", blockSizeString, "Split the stream into blocks of this size.")
cmd.Flags().StringVarP(&flushIntervalString, "flush-interval", "f", flushIntervalString, "The longest time to wait before accumulated data is written to the remote service. Data will be flushed either when --block-size of data has been accumulated or when the specified interval has elapsed, whichever comes first. This is ignored if the input is a regular file. Set to 0 to disable.")
return cmd
}

Expand Down
Loading

0 comments on commit 684a95c

Please sign in to comment.