Skip to content

Commit

Permalink
Merge pull request #287 from dedis/from-c4dt
Browse files Browse the repository at this point in the history
From c4dt
  • Loading branch information
jbsv authored May 27, 2024
2 parents c47771c + d0124e4 commit 7288f63
Show file tree
Hide file tree
Showing 35 changed files with 1,270 additions and 109 deletions.
9 changes: 9 additions & 0 deletions cli/node/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"time"
Expand Down Expand Up @@ -98,6 +99,14 @@ type socketDaemon struct {
// Listen implements node.Daemon. It starts the daemon by creating the unix
// socket file to the path.
func (d *socketDaemon) Listen() error {
_, err := os.Stat(d.socketpath)
if err == nil {
d.logger.Warn().Msg("Cleaning existing socket file")
err := os.Remove(d.socketpath)
if err != nil {
return xerrors.Errorf("couldn't clear tangling socketpath: %v", err)
}
}
socket, err := d.listenFn("unix", d.socketpath)
if err != nil {
return xerrors.Errorf("couldn't bind socket: %v", err)
Expand Down
48 changes: 39 additions & 9 deletions cli/node/memcoin/mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
"encoding/base64"
"fmt"
"io"
"net"
Expand All @@ -14,6 +15,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.dedis.ch/kyber/v3/pairing/bn256"
)

// This test creates a chain with initially 3 nodes. It then adds node 4 and 5
Expand Down Expand Up @@ -74,28 +76,49 @@ func TestMemcoin_Scenario_SetupAndTransactions(t *testing.T) {
args = append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node4)...,
)

err = run(args)
require.NoError(t, err)

// Add the certificate and push two new blocks to make sure node4 is
// fully participating
shareCert(t, node4, node1, "//127.0.0.1:2111")
publicKey, err := bn256.NewSuiteG2().Point().MarshalBinary()
require.NoError(t, err)
publicKeyHex := base64.StdEncoding.EncodeToString(publicKey)
argsAccess := []string{
os.Args[0],
"--config", node1, "access", "add",
"--identity", publicKeyHex,
}
for i := 0; i < 2; i++ {
err = runWithCfg(argsAccess, config{})
require.NoError(t, err)
}

// Add node 5 which should be participating.
// This makes sure that node 4 is actually participating and caught up.
// If node 4 is not participating, there would be too many faulty nodes
// after adding node 5.
args = append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node5)...,
)

err = run(args)
require.NoError(t, err)

// Run a few transactions.
for i := 0; i < 5; i++ {
err = runWithCfg(args, config{})
require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2115")
// Run 2 new transactions
for i := 0; i < 2; i++ {
err = runWithCfg(argsAccess, config{})
require.NoError(t, err)
}

// Test a timeout waiting for a transaction.
Expand Down Expand Up @@ -146,12 +169,14 @@ func TestMemcoin_Scenario_RestartNode(t *testing.T) {
args := append([]string{
os.Args[0],
"--config", node1, "ordering", "roster", "add",
"--wait", "60s"},
"--wait", "60s",
},
getExport(t, node1)...,
)

err = run(args)
require.EqualError(t, err, "command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210")
require.EqualError(t, err,
"command error: transaction refused: duplicate in roster: grpcs://127.0.0.1:2210")
}

// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -230,7 +255,12 @@ func waitDaemon(t *testing.T, daemons []string) bool {

func makeNodeArg(path string, port uint16) []string {
return []string{
os.Args[0], "--config", path, "start", "--listen", "tcp://127.0.0.1:" + strconv.Itoa(int(port)),
os.Args[0],
"--config",
path,
"start",
"--listen",
"tcp://127.0.0.1:" + strconv.Itoa(int(port)),
}
}

Expand Down
4 changes: 4 additions & 0 deletions core/ordering/cosipbft/blockstore/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/binary"
"sync"

"go.dedis.ch/dela"
"go.dedis.ch/dela/core"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand Down Expand Up @@ -87,6 +88,9 @@ func (s *InDisk) Load() error {
s.last = link
s.indices[link.GetBlock().GetHash()] = link.GetBlock().GetIndex()

if s.length%100 == 0 {
dela.Logger.Info().Msgf("Loaded %d blocks", s.length)
}
return nil
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRegisterContract(t *testing.T) {
}

func TestNewTransaction(t *testing.T) {
mgr := NewManager(signed.NewManager(fake.NewSigner(), nil))
mgr := NewManager(signed.NewManager(fake.NewSigner(), fake.NewClient()))

tx, err := mgr.Make(authority.New(nil, nil))
require.NoError(t, err)
Expand Down
74 changes: 51 additions & 23 deletions core/ordering/cosipbft/cosipbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// protocol and the followers wait for incoming messages to update their own
// state machines and reply with signatures when the leader candidate is valid.
// If the leader fails to send a candidate, or finalize it, the followers will
// timeout after some time and move to a view change state.
// time out after some time and move to a view change state.
//
// The view change procedure is always waiting on the leader+1 confirmation
// before moving to leader+2, leader+3, etc. It means that if not enough nodes
Expand Down Expand Up @@ -43,6 +43,7 @@ import (
"go.dedis.ch/dela/core/ordering/cosipbft/blockstore"
"go.dedis.ch/dela/core/ordering/cosipbft/blocksync"
"go.dedis.ch/dela/core/ordering/cosipbft/contracts/viewchange"
"go.dedis.ch/dela/core/ordering/cosipbft/fastsync"
"go.dedis.ch/dela/core/ordering/cosipbft/pbft"
"go.dedis.ch/dela/core/ordering/cosipbft/types"
"go.dedis.ch/dela/core/store"
Expand All @@ -61,7 +62,7 @@ import (
const (
// DefaultRoundTimeout is the maximum round time the service waits
// for an event to happen.
DefaultRoundTimeout = 10 * time.Second
DefaultRoundTimeout = 10 * time.Minute

// DefaultFailedRoundTimeout is the maximum round time the service waits
// for an event to happen, after a round has failed, thus letting time
Expand All @@ -71,14 +72,17 @@ const (

// DefaultTransactionTimeout is the maximum allowed age of transactions
// before a view change is executed.
DefaultTransactionTimeout = 30 * time.Second
DefaultTransactionTimeout = 5 * time.Minute

// RoundWait is the constant value of the exponential backoff use between
// round failures.
RoundWait = 5 * time.Millisecond
RoundWait = 50 * time.Millisecond

// RoundMaxWait is the maximum amount for the backoff.
RoundMaxWait = 5 * time.Minute
RoundMaxWait = 10 * time.Minute

// DefaultFastSyncMessageSize defines when a fast sync message will be split.
DefaultFastSyncMessageSize = 1e6

rpcName = "cosipbft"
)
Expand Down Expand Up @@ -115,9 +119,10 @@ type Service struct {
}

type serviceTemplate struct {
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
hashFac crypto.HashFactory
blocks blockstore.BlockStore
genesis blockstore.GenesisStore
syncMethod syncMethodType
}

// ServiceOption is the type of option to set some fields of the service.
Expand All @@ -144,8 +149,15 @@ func WithHashFactory(fac crypto.HashFactory) ServiceOption {
}
}

// WithBlockSync enables the old, slow syncing algorithm in the cosipbft module.
func WithBlockSync() ServiceOption {
return func(tmpl *serviceTemplate) {
tmpl.syncMethod = syncMethodBlock
}
}

// ServiceParam is the different components to provide to the service. All the
// fields are mandatory and it will panic if any is nil.
// fields are mandatory, and it will panic if any is nil.
type ServiceParam struct {
Mino mino.Mino
Cosi cosi.CollectiveSigning
Expand Down Expand Up @@ -220,10 +232,11 @@ func NewServiceStruct(param ServiceParam, opts ...ServiceOption) (*Service, erro
ChainFactory: chainFac,
VerifierFactory: param.Cosi.GetVerifierFactory(),
}

bs := blocksync.NewSynchronizer(syncparam)

proc.sync = bs
if tmpl.syncMethod == syncMethodBlock {
proc.bsync = blocksync.NewSynchronizer(syncparam)
} else {
proc.fsync = fastsync.NewSynchronizer(syncparam)
}

fac := types.NewMessageFactory(
types.NewGenesisFactory(proc.rosterFac),
Expand Down Expand Up @@ -275,9 +288,20 @@ func NewServiceStart(s *Service) {
go s.watchBlocks()

if s.genesis.Exists() {
// If the genesis already exists, the service can start right away to
// participate in the chain.
// If the genesis already exists, and all blocks are loaded,
// the service can start right away to participate in the chain.
close(s.started)
if s.syncMethod() == syncMethodFast {
go func() {
roster, err := s.getCurrentRoster()
if err != nil {
s.logger.Err(err).Msg("Couldn't get roster")
} else {
s.logger.Info().Msg("Triggering catchup")
s.catchup <- roster
}
}()
}
}
}

Expand Down Expand Up @@ -541,17 +565,21 @@ func (s *Service) doLeaderRound(

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("round has started")

// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.sync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
// When using blocksync, the updates are sent before every new block, which
// uses a lot of bandwidth if there are more than just a few blocks.
if s.syncMethod() == syncMethodBlock {
// Send a synchronization to the roster so that they can learn about the
// latest block of the chain.
err := s.bsync.Sync(ctx, roster,
blocksync.Config{MinHard: threshold.ByzantineThreshold(roster.Len())})
if err != nil {
return xerrors.Errorf("sync failed: %v", err)
}
}

s.logger.Debug().Uint64("index", s.blocks.Len()).Msg("pbft has started")

err = s.doPBFT(ctx)
err := s.doPBFT(ctx)
if err != nil {
return xerrors.Errorf("pbft failed: %v", err)
}
Expand Down Expand Up @@ -677,7 +705,7 @@ func (s *Service) doPBFT(ctx context.Context) error {
block, err = types.NewBlock(
data,
types.WithTreeRoot(root),
types.WithIndex(uint64(s.blocks.Len())),
types.WithIndex(s.blocks.Len()),
types.WithHashFactory(s.hashFactory))

if err != nil {
Expand Down
25 changes: 18 additions & 7 deletions core/ordering/cosipbft/cosipbft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,25 @@ import (
"go.dedis.ch/dela/testing/fake"
)

func TestService_Scenario_Basic_Blocksync(t *testing.T) {
testserviceScenarioBasic(t, syncMethodBlock)
}
func TestService_Scenario_Basic_Fastsync(t *testing.T) {
testserviceScenarioBasic(t, syncMethodFast)
}

// This test is known to be VERY flaky on Windows.
// Further investigation is needed.
func TestService_Scenario_Basic(t *testing.T) {
func testserviceScenarioBasic(t *testing.T, sm syncMethodType) {
if testing.Short() {
t.Skip("Skipping flaky test")
}

nodes, ro, clean := makeAuthority(t, 5)
var opts []ServiceOption
if sm == syncMethodBlock {
opts = append(opts, WithBlockSync())
}
nodes, ro, clean := makeAuthority(t, 5, opts...)
defer clean()

signer := nodes[0].signer
Expand Down Expand Up @@ -164,7 +175,7 @@ func TestService_Scenario_ViewChange_Request(t *testing.T) {
require.Equal(t, leader, nodes[0].onet.GetAddress())

// let enough time for a round to run
time.Sleep(DefaultRoundTimeout + 100*time.Millisecond)
time.Sleep(time.Second)

require.Equal(t, nodes[3].service.pbftsm.GetState(), pbft.ViewChangeState)
require.NotEqual(t, nodes[2].service.pbftsm.GetState(), pbft.ViewChangeState)
Expand Down Expand Up @@ -203,7 +214,7 @@ func TestService_Scenario_ViewChange_NoRequest(t *testing.T) {
require.NoError(t, err)

// let enough time for a round to run
time.Sleep(DefaultRoundTimeout + 100*time.Millisecond)
time.Sleep(time.Second)

require.NotEqual(t, nodes[3].service.pbftsm.GetState(), pbft.ViewChangeState)
require.NotEqual(t, nodes[2].service.pbftsm.GetState(), pbft.ViewChangeState)
Expand Down Expand Up @@ -450,7 +461,7 @@ func TestService_DoRound(t *testing.T) {
closing: make(chan struct{}),
}
srvc.blocks = blockstore.NewInMemory()
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}
srvc.pool = mem.NewPool()
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
Expand Down Expand Up @@ -618,7 +629,7 @@ func TestService_FailSync_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{err: fake.GetError()}
srvc.bsync = fakeSync{err: fake.GetError()}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -641,7 +652,7 @@ func TestService_FailPBFT_DoRound(t *testing.T) {
srvc.tree = blockstore.NewTreeCache(fakeTree{})
srvc.rosterFac = authority.NewFactory(fake.AddressFactory{}, fake.PublicKeyFactory{})
srvc.pbftsm = fakeSM{}
srvc.sync = fakeSync{}
srvc.bsync = fakeSync{}

require.NoError(t, srvc.pool.Add(makeTx(t, 0, fake.NewSigner())))

Expand Down
Loading

0 comments on commit 7288f63

Please sign in to comment.