Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Oct 17, 2024
1 parent 8af5e34 commit c43e2e2
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 7 deletions.
113 changes: 113 additions & 0 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"math"
"net"
"os"
"path"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -795,6 +798,116 @@ func (d *ScyllaShardAwareDialer) DialContext(ctx context.Context, network, addr
return dialerWithLocalAddr.DialContext(ctx, network, addr)
}

func NewRecordDialer(dir string) *RecordDialer {
return &RecordDialer{
dir: dir,
}
}

type RecordDialer struct {
dir string
net.Dialer
}

func (d *RecordDialer) DialContext(ctx context.Context, network, addr string) (conn net.Conn, err error) {
fmt.Println("Dial Context Record Dialer")
sourcePort := ScyllaGetSourcePort(ctx)
if sourcePort == 0 {
return d.Dialer.DialContext(ctx, network, addr)
}
dialerWithLocalAddr := d.Dialer
dialerWithLocalAddr.LocalAddr, err = net.ResolveTCPAddr(network, fmt.Sprintf(":%d", sourcePort))
if err != nil {
return nil, err
}

conn, err = dialerWithLocalAddr.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}

return NewConnectionRecorder(path.Join(d.dir, fmt.Sprintf("%s-%d", addr, sourcePort)), conn)
}

func NewConnectionRecorder(fname string, conn net.Conn) (net.Conn, error) {
fmt.Println("New recorder: ", fname)
fd_writes, err := os.OpenFile(fname+"Writes", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
return nil, err
}
fd_reads, err2 := os.OpenFile(fname+"Reads", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err2 != nil {
return nil, err2
}
return &ConnectionRecorder{fd_writes: fd_writes, fd_reads: fd_reads, orig: conn}, nil
}

type ConnectionRecorder struct {
fd_writes *os.File
fd_reads *os.File
orig net.Conn
}

func (c ConnectionRecorder) Read(b []byte) (n int, err error) {
n, err = c.orig.Read(b)
if err != nil && err != io.EOF {
return n, err
}

_, writeErr := c.fd_reads.Write(b[:n])
if writeErr != nil {
return n, fmt.Errorf("failed to record read: %w", writeErr)
}
_, writeErr = c.fd_reads.Write([]byte("\n"))
if writeErr != nil {
return n, fmt.Errorf("failed to record read: %w", writeErr)
}

return n, err
}

func (c ConnectionRecorder) Write(b []byte) (n int, err error) {
_, writeErr := c.fd_writes.Write(b)
if writeErr != nil {
return n, fmt.Errorf("failed to record write: %w", writeErr)
}
_, writeErr = c.fd_writes.Write([]byte("\n"))
if writeErr != nil {
return n, fmt.Errorf("failed to record write: %w", writeErr)
}
return c.orig.Write(b)
}

func (c ConnectionRecorder) Close() error {
if err := c.fd_writes.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
if err := c.fd_reads.Close(); err != nil {
return fmt.Errorf("failed to close the file: %w", err)
}
return c.orig.Close()
}

func (c ConnectionRecorder) LocalAddr() net.Addr {
return c.orig.LocalAddr()
}

func (c ConnectionRecorder) RemoteAddr() net.Addr {
return c.orig.RemoteAddr()
}

func (c ConnectionRecorder) SetDeadline(t time.Time) error {
return c.orig.SetDeadline(t)
}

func (c ConnectionRecorder) SetReadDeadline(t time.Time) error {
return c.orig.SetReadDeadline(t)
}

func (c ConnectionRecorder) SetWriteDeadline(t time.Time) error {
return c.orig.SetWriteDeadline(t)
}

type scyllaPortIterator struct {
currentPort int
shardCount int
Expand Down
82 changes: 82 additions & 0 deletions tests/bench/bench_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package bench

import (
"context"
"fmt"
"testing"

"github.com/gocql/gocql"
)

func RecordTraffic(size int) {
cluster := gocql.NewCluster("127.0.0.2")
cluster.Consistency = gocql.Quorum

cluster.Dialer = gocql.NewRecordDialer("/home/sylwiaszunejko/gocql/recordings")

fallback := gocql.RoundRobinHostPolicy()
cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback)

session, err := cluster.CreateSession()
if err != nil {
panic(err)
}
defer session.Close()

keyspace := "single_conn_bench"

err = session.Query(`DROP KEYSPACE IF EXISTS ` + keyspace).Exec()
if err != nil {
panic(fmt.Sprintf("unable to drop keyspace: %v", err))
}

err = session.Query(fmt.Sprintf(`CREATE KEYSPACE %s WITH replication = {'class' : 'NetworkTopologyStrategy','replication_factor' : 1}`, keyspace)).Exec()

if err != nil {
panic(fmt.Sprintf("unable to create keyspace: %v", err))
}

if err := session.Query(fmt.Sprintf(`CREATE TABLE %s.%s (pk int, ck int, v text, PRIMARY KEY (pk));
`, keyspace, "table1")).Exec(); err != nil {
panic(fmt.Sprintf("unable to create table: %v", err))
}

ctx := context.Background()

for i := 0; i < size; i++ {
err = session.Query(`INSERT INTO single_conn_bench.table1 (pk, ck, v) VALUES (?, ?, ?);`, i, i%5, fmt.Sprintf("Name_%d", i)).WithContext(ctx).Exec()
if err != nil {
panic(err)
}
}

for i := 0; i < size; i++ {
var pk int
var ck int
var v string

err = session.Query(`SELECT pk, ck, v FROM single_conn_bench.table1 WHERE pk = ?;`, i).WithContext(ctx).Consistency(gocql.One).Scan(&pk, &ck, &v)
if err != nil {
panic(err)
}
}
}

func BenchmarkSingleConnection(b *testing.B) {
RecordTraffic(100)
b.Run("SmallDataset", func(b *testing.B) {
b.Run("Writes", func(b *testing.B) {
// open file with writes
// read line by line
// for every line simulate Write
})
b.Run("Reads", func(b *testing.B) {
})
})
// b.Run("MediumDataset", func(b *testing.B) {
// RecordTraffic(10000)
// })
// b.Run("BigDataset", func(b *testing.B) {
// RecordTraffic(100000)
// })
}
4 changes: 2 additions & 2 deletions tests/bench/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ require (
)

require (
github.com/golang/snappy v0.0.3 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/klauspost/compress v1.17.9 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)

replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4
replace github.com/gocql/gocql => ../..
16 changes: 11 additions & 5 deletions tests/bench/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,26 @@ github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo
github.com/brianvoe/gofakeit/v6 v6.28.0/go.mod h1:Xj58BMSnFqcn/fAQeSK+/PLtC5kSb7FJIq4JyGa8vEs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/scylladb/gocql v1.14.4 h1:MhevwCfyAraQ6RvZYFO3pF4Lt0YhvQlfg8Eo2HEqVQA=
github.com/scylladb/gocql v1.14.4/go.mod h1:ZLEJ0EVE5JhmtxIW2stgHq/v1P4fWap0qyyXSKyV8K0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.0.0-20220526153639-5463443f8c37 h1:lUkvobShwKsOesNfWWlCS5q7fnbG1MEliIzwu886fn8=
golang.org/x/net v0.0.0-20220526153639-5463443f8c37/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand All @@ -35,5 +39,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=

0 comments on commit c43e2e2

Please sign in to comment.