Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement set reconciliation based sync (syncv2) #5769

Closed
wants to merge 80 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
9bbf1da
[wip] Range-based set reconciliation
ivan4th Dec 29, 2023
5844d83
hashsync: refactor rangesync to make it more stream-friendly
ivan4th Jan 4, 2024
dfa569d
hashsync: get rid of parent links in the tree
ivan4th Jan 5, 2024
34f3b73
hashsync: implement persistent tree
ivan4th Jan 5, 2024
6bac0de
hashsync: add tree values and lookup
ivan4th Jan 5, 2024
513b136
hashsync: fix sending same items multiple times
ivan4th Jan 6, 2024
4cc5358
hashsync: test XOR fingerprint based sync for hashes
ivan4th Jan 6, 2024
077c1ca
p2p: streamed / interactive protocol support
ivan4th Jan 8, 2024
5be5a1e
hashsync: implement wire protocol
ivan4th Jan 10, 2024
a043c87
hashsync: add items to the store even in case of errors
ivan4th Jan 10, 2024
b8cc6b0
hashsync: rename MonoidTree to SyncTree
ivan4th Jan 10, 2024
a8fa3fd
hashsync: send the actual objects along with the hashes
ivan4th Jan 12, 2024
57767f9
hashsync: support bounded sync
ivan4th Jan 13, 2024
e3d3cd6
hashsync: implement sync probes
ivan4th Jan 13, 2024
b60079a
Merge branch 'develop' into feature/syncv2
ivan4th Mar 23, 2024
f8a021f
hashsync: convert from chunked streams to normal streams
ivan4th Mar 24, 2024
06f89dc
hashsync: implement multi-peer split-sync
ivan4th Mar 28, 2024
34a1943
hashsync: add minhash probing
ivan4th May 3, 2024
f05fb38
hashsync: implemented part of multipeer sync
ivan4th May 15, 2024
4a4bc35
hashsync: don't send values
ivan4th May 15, 2024
fbe9ceb
hashsync: implement working setSyncBase / setSyncer
ivan4th May 17, 2024
ed7ca2d
hashsync: fix propagating keys to setSyncBase
ivan4th May 18, 2024
031620b
hashsync: fix multipeer and add test
ivan4th May 18, 2024
1b731f3
p2p: server: use zap for logging
ivan4th May 23, 2024
fb22b9d
p2p: server: store peer ID in the context
ivan4th May 23, 2024
0ec78d7
p2p: server: include 'read/write' in deadlineAdjuster error messages
ivan4th May 23, 2024
3de284b
p2p: server: close streams upon context cancellation
ivan4th May 23, 2024
88a67e6
sync2: implement P2PHashSync
ivan4th May 23, 2024
c69e860
Merge branch 'develop' into feature/syncv2
ivan4th May 23, 2024
ef02024
sync2: fixup
ivan4th May 23, 2024
fcabb5f
Merge branch 'develop' into feature/syncv2
ivan4th May 31, 2024
5327c57
Merge branch 'develop' into feature/syncv2
ivan4th Aug 23, 2024
ad76882
wip
ivan4th Jun 18, 2024
4450fce
wip2
ivan4th Jun 18, 2024
34224ce
fptree works
ivan4th Jun 18, 2024
a970d21
pool based tree
ivan4th Jun 19, 2024
df3e554
test inverse intervals
ivan4th Jun 19, 2024
949f207
cleanup
ivan4th Jun 19, 2024
3a8b819
separate test for rcpool
ivan4th Jun 20, 2024
2cccc05
sql database
ivan4th Jun 20, 2024
4452894
hashsync: fix tests
ivan4th Jun 20, 2024
bf450f4
hashsync: make ItemStore and Iterator methods return errors
ivan4th Jun 20, 2024
2950cab
wip: fptree w/o end node works
ivan4th Jun 25, 2024
fa24d21
fix test
ivan4th Jun 25, 2024
8a54bae
re-enable in-mem store test, skip slow TestATXFP
ivan4th Jul 13, 2024
3df8c38
make dbiter wrap around
ivan4th Jul 15, 2024
b5b00ea
use dynamically extended chunkSize in dbiter
ivan4th Jul 17, 2024
bbccaf0
fptree: use startTail/endTail instead of tailRefs list
ivan4th Jul 17, 2024
825ff03
fptree: use iterators in fptree & fix limits
ivan4th Jul 23, 2024
cdb3f7a
fptree: return iterators from aggregation
ivan4th Jul 24, 2024
a77e698
fptree: test and fix empty range handling
ivan4th Jul 24, 2024
53a241c
fptree: add idStore.start() method
ivan4th Jul 24, 2024
07e791d
fptree: dump tree stats
ivan4th Jul 24, 2024
2c1554a
fptree: don't pass maxDepth to tree stores
ivan4th Jul 24, 2024
64667b9
dbsync: implement working DBItemStore with test
ivan4th Jul 24, 2024
fd09229
dbsync: integrate with hashsync pairwise sync
ivan4th Jul 25, 2024
dc5b1f5
wip
ivan4th Jul 30, 2024
6ec95f6
wip2
ivan4th Jul 30, 2024
1839081
wip3
ivan4th Aug 1, 2024
2513a20
wip4
ivan4th Aug 1, 2024
0456107
wip5
ivan4th Aug 1, 2024
f93a02c
wip6
ivan4th Aug 13, 2024
72018dc
wip7
ivan4th Aug 17, 2024
251cd3e
wip8 -- fast enough
ivan4th Aug 19, 2024
0526bcf
p2p: server fixup
ivan4th Aug 24, 2024
31e28d0
sync2: build fixup
ivan4th Aug 24, 2024
1b94a79
sync2: optimize wire data size
ivan4th Aug 24, 2024
b4e9c0d
sync2: add table snapshotting based on rowid
ivan4th Aug 24, 2024
25ca1d1
sync2: implement advancing DBItemStore
ivan4th Aug 25, 2024
c805ec9
sync2: add WithMaxDiff for RangeSetReconciler
ivan4th Aug 26, 2024
7363d40
sync2: optimize sending whole range
ivan4th Aug 26, 2024
5684022
sync2: fix fptree test
ivan4th Aug 27, 2024
263369f
sync2: add sync priming via sending recent items
ivan4th Aug 31, 2024
1d7f9a7
sync2: fix logging
ivan4th Aug 31, 2024
c97f023
sync2: clean up
ivan4th Sep 4, 2024
f9ed0a0
sync2: fix combined sequence tests
ivan4th Sep 4, 2024
4708f07
sync2: fix multipeer test
ivan4th Sep 5, 2024
9597db2
sync2: add syncedness check
ivan4th Sep 5, 2024
7968770
sync2: don't pre-read round messages in RangeSetReconciler
ivan4th Sep 5, 2024
238bbb3
sync2: initial syncer integration
ivan4th Sep 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions common/types/hashes.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package types

import (
"bytes"
"encoding/hex"
"fmt"
"reflect"
Expand All @@ -9,11 +10,13 @@ import (

"github.com/spacemeshos/go-spacemesh/common/util"
"github.com/spacemeshos/go-spacemesh/hash"
"github.com/spacemeshos/go-spacemesh/log"
)

const (
Hash32Length = 32
Hash20Length = 20
Hash12Length = 12
)

var (
Expand All @@ -30,6 +33,9 @@ type Hash32 [Hash32Length]byte
// Hash20 represents the 20-byte blake3 hash of arbitrary data.
type Hash20 [Hash20Length]byte

// Hash12 represents the 12-byte hash used for sync
type Hash12 [Hash12Length]byte

// Bytes gets the byte representation of the underlying hash.
func (h Hash20) Bytes() []byte { return h[:] }

Expand Down Expand Up @@ -88,6 +94,15 @@ func (h Hash20) ToHash32() (h32 Hash32) {
return
}

// String implements the stringer interface and is used also by the logger when
// doing full logging into a file.
func (h Hash12) String() string {
return util.Encode(h[:5])
}

// Field returns a log field. Implements the LoggableField interface.
func (h Hash12) Field() log.Field { return log.String("hash", hex.EncodeToString(h[:])) }

// CalcProposalsHash32 returns the 32-byte blake3 sum of the IDs, sorted in lexicographic order. The pre-image is
// prefixed with additionalBytes.
func CalcProposalsHash32(view []ProposalID, additionalBytes []byte) Hash32 {
Expand Down Expand Up @@ -160,6 +175,16 @@ func (h Hash32) ShortString() string {
return hex.EncodeToString(h[:5])
}

// Compare compares a Hash32 to another hash and returns
//
// -1 if this hash is less than the other
// 0 if the hashes are equal
// 1 if this hash is greater than the other
func (h Hash32) Compare(other any) int {
oh := other.(Hash32)
return bytes.Compare(h[:], oh[:])
}

// Format implements fmt.Formatter, forcing the byte slice to be formatted as is,
// without going through the stringer interface used for logging.
func (h Hash32) Format(s fmt.State, c rune) {
Expand Down
9 changes: 9 additions & 0 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"time"

corehost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -1006,3 +1007,11 @@ func (f *Fetch) SelectBestShuffled(n int) []p2p.Peer {
})
return peers
}

func (f *Fetch) Host() corehost.Host {
return f.host.(corehost.Host)
}

func (f *Fetch) Peers() *peers.Peers {
return f.peers
}
7 changes: 7 additions & 0 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ type Peers struct {
globalLatency float64
}

func (p *Peers) Contains(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, exist := p.peers[id]
return exist
}

func (p *Peers) Add(id peer.ID) bool {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/spacemeshos/go-spacemesh

go 1.22.4
go 1.23.0

require (
cloud.google.com/go/storage v1.43.0
Expand Down Expand Up @@ -35,6 +35,7 @@ require (
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.55.0
github.com/quic-go/quic-go v0.46.0
github.com/rqlite/sql v0.0.0-20240312185922-ffac88a740bd
github.com/rs/cors v1.11.0
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/seehuhn/mt19937 v1.0.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM=
github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -551,6 +553,8 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rqlite/sql v0.0.0-20240312185922-ffac88a740bd h1:wW6BtayFoKaaDeIvXRE3SZVPOscSKlYD+X3bB749+zk=
github.com/rqlite/sql v0.0.0-20240312185922-ffac88a740bd/go.mod h1:ib9zVtNgRKiGuoMyUqqL5aNpk+r+++YlyiVIkclVqPg=
github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po=
github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
Expand Down
14 changes: 10 additions & 4 deletions p2p/server/deadline_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -52,6 +53,8 @@ type deadlineAdjuster struct {
nextAdjustRead int
nextAdjustWrite int
hardDeadline time.Time
closeErr error
close sync.Once
}

var _ io.ReadWriteCloser = &deadlineAdjuster{}
Expand Down Expand Up @@ -85,11 +88,14 @@ func (dadj *deadlineAdjuster) augmentError(what string, err error) error {
}
}

// Close closes the stream. This method is safe to call multiple times.
// Close closes the stream.
func (dadj *deadlineAdjuster) Close() error {
// FIXME: unsure if this is really needed (inherited from the older Server code)
_ = dadj.peerStream.SetDeadline(time.Time{})
return dadj.peerStream.Close()
dadj.close.Do(func() {
// FIXME: unsure if this is really needed (inherited from the older Server code)
_ = dadj.peerStream.SetDeadline(time.Time{})
dadj.closeErr = dadj.peerStream.Close()
})
return dadj.closeErr
}

func (dadj *deadlineAdjuster) adjust() error {
Expand Down
6 changes: 5 additions & 1 deletion p2p/server/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,18 @@ import (
"github.com/spacemeshos/go-spacemesh/p2p/peerinfo"
)

//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go -exclude_interfaces Host
//go:generate mockgen -typed -package=mocks -destination=./mocks/mocks.go -source=./interface.go -exclude_interfaces Host,PeerInfoHost

// Host is a subset of libp2p Host interface that needs to be implemented to be usable with server.
type Host interface {
SetStreamHandler(protocol.ID, network.StreamHandler)
NewStream(context.Context, peer.ID, ...protocol.ID) (network.Stream, error)
Network() network.Network
ConnManager() connmgr.ConnManager
}

type PeerInfoHost interface {
Host
PeerInfo() peerinfo.PeerInfo
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/server/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 32 additions & 6 deletions p2p/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,30 @@ func WithRequestsPerInterval(n int, interval time.Duration) Opt {
}
}

// WithDecayingTag specifies P2P decaying tag that is applied to the peer when a request
// is being served
func WithDecayingTag(tag DecayingTagSpec) Opt {
return func(s *Server) {
s.decayingTagSpec = &tag
}
}

type peerIDKey = struct{}

func withPeerID(ctx context.Context, peerID peer.ID) context.Context {
return context.WithValue(ctx, peerIDKey{}, peerID)
}

// ContextPeerID retrieves the ID of the peer being served from the context and a boolean
// value indicating that the context contains peer ID. If there's no peer ID associated
// with the context, the function returns an empty peer ID and false.
func ContextPeerID(ctx context.Context) (peer.ID, bool) {
if v := ctx.Value(peerIDKey{}); v != nil {
return v.(peer.ID), true
}
return peer.ID(""), false
}

// Handler is a handler to be defined by the application.
type Handler func(context.Context, []byte) ([]byte, error)

Expand Down Expand Up @@ -237,6 +255,13 @@ type request struct {
received time.Time
}

func (s *Server) peerInfo() peerinfo.PeerInfo {
if h, ok := s.h.(PeerInfoHost); ok {
return h.PeerInfo()
}
return nil
}

func (s *Server) Run(ctx context.Context) error {
var eg errgroup.Group
for {
Expand All @@ -257,7 +282,8 @@ func (s *Server) Run(ctx context.Context) error {
eg.Wait()
return nil
}
ctx, cancel := context.WithCancel(ctx)
peer := req.stream.Conn().RemotePeer()
ctx, cancel := context.WithCancel(withPeerID(ctx, peer))
eg.Go(func() error {
<-ctx.Done()
s.sem.Release(1)
Expand All @@ -268,12 +294,12 @@ func (s *Server) Run(ctx context.Context) error {
defer cancel()
conn := req.stream.Conn()
if s.decayingTag != nil {
s.decayingTag.Bump(conn.RemotePeer(), s.decayingTagSpec.Inc)
s.decayingTag.Bump(peer, s.decayingTagSpec.Inc)
}
ok := s.queueHandler(ctx, req.stream)
duration := time.Since(req.received)
if s.h.PeerInfo() != nil {
info := s.h.PeerInfo().EnsurePeerInfo(conn.RemotePeer())
if s.peerInfo() != nil {
info := s.peerInfo().EnsurePeerInfo(conn.RemotePeer())
info.ServerStats.RequestDone(duration, ok)
}
if s.metrics != nil {
Expand Down Expand Up @@ -448,8 +474,8 @@ func (s *Server) streamRequest(
if err != nil {
return nil, nil, err
}
if s.h.PeerInfo() != nil {
info = s.h.PeerInfo().EnsurePeerInfo(stream.Conn().RemotePeer())
if s.peerInfo() != nil {
info = s.peerInfo().EnsurePeerInfo(stream.Conn().RemotePeer())
}
dadj := newDeadlineAdjuster(stream, s.timeout, s.hardTimeout)
defer func() {
Expand Down
24 changes: 16 additions & 8 deletions p2p/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/spacemeshos/go-scale/tester"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -45,8 +46,10 @@ func TestServer(t *testing.T) {
request := []byte("test request")
testErr := errors.New("test error")

handler := func(_ context.Context, msg []byte) ([]byte, error) {
return msg, nil
handler := func(ctx context.Context, msg []byte) ([]byte, error) {
peerID, found := ContextPeerID(ctx)
require.True(t, found)
return append(msg, []byte(peerID)...), nil
}
errhandler := func(_ context.Context, _ []byte) ([]byte, error) {
return nil, testErr
Expand Down Expand Up @@ -81,6 +84,9 @@ func TestServer(t *testing.T) {
append(opts, WithRequestSizeLimit(limit))...,
)
ctx, cancel := context.WithCancel(context.Background())
noPeerID, found := ContextPeerID(ctx)
require.Equal(t, peer.ID(""), noPeerID)
require.False(t, found)
var eg errgroup.Group
eg.Go(func() error {
return srv1.Run(ctx)
Expand Down Expand Up @@ -109,16 +115,17 @@ func TestServer(t *testing.T) {
srvID := mesh.Hosts()[1].ID()
response, err := client.Request(ctx, srvID, request)
require.NoError(t, err)
require.Equal(t, request, response)
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
require.Equal(t, expResponse, response)
srvConns := mesh.Hosts()[1].Network().ConnsToPeer(mesh.Hosts()[0].ID())
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())

clientInfo := client.h.PeerInfo().EnsurePeerInfo(srvID)
clientInfo := client.peerInfo().EnsurePeerInfo(srvID)
require.Equal(t, 1, clientInfo.ClientStats.SuccessCount())
require.Zero(t, clientInfo.ClientStats.FailureCount())

serverInfo := srv1.h.PeerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
serverInfo := srv1.peerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
require.Eventually(t, func() bool {
return serverInfo.ServerStats.SuccessCount() == 1
}, 10*time.Second, 10*time.Millisecond)
Expand All @@ -129,7 +136,8 @@ func TestServer(t *testing.T) {
srvID := mesh.Hosts()[3].ID()
response, err := client.Request(ctx, srvID, request)
require.NoError(t, err)
require.Equal(t, request, response)
expResponse := append(request, []byte(mesh.Hosts()[0].ID())...)
require.Equal(t, expResponse, response)
srvConns := mesh.Hosts()[3].Network().ConnsToPeer(mesh.Hosts()[0].ID())
require.NotEmpty(t, srvConns)
require.Equal(t, n+1, srv1.NumAcceptedRequests())
Expand All @@ -144,11 +152,11 @@ func TestServer(t *testing.T) {
require.ErrorContains(t, err, testErr.Error())
require.Equal(t, n+1, srv1.NumAcceptedRequests())

clientInfo := client.h.PeerInfo().EnsurePeerInfo(srvID)
clientInfo := client.peerInfo().EnsurePeerInfo(srvID)
require.Zero(t, clientInfo.ClientStats.SuccessCount())
require.Equal(t, 1, clientInfo.ClientStats.FailureCount())

serverInfo := srv2.h.PeerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
serverInfo := srv2.peerInfo().EnsurePeerInfo(mesh.Hosts()[0].ID())
require.Eventually(t, func() bool {
return serverInfo.ServerStats.FailureCount() == 1
}, 10*time.Second, 10*time.Millisecond)
Expand Down
10 changes: 10 additions & 0 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"sync"
"sync/atomic"
"testing"
"time"

sqlite "github.com/go-llsqlite/crawshaw"
Expand Down Expand Up @@ -233,6 +234,15 @@ func InMemory(opts ...Opt) *sqliteDatabase {
return db
}

// InMemoryTest returns an in-mem database for testing and ensures database is closed during `tb.Cleanup`.
func InMemoryTest(tb testing.TB, opts ...Opt) *sqliteDatabase {
// When using empty DB schema, we don't want to check for schema drift due to
// "PRAGMA user_version = 0;" in the initial schema retrieved from the DB.
db := InMemory(append(opts, WithNoCheckSchemaDrift())...)
tb.Cleanup(func() { db.Close() })
return db
}

// Open database with options.
//
// Database is opened in WAL mode and pragma synchronous=normal.
Expand Down
Loading