Skip to content
This repository has been archived by the owner on Dec 20, 2023. It is now read-only.

Integrate Mir0.4.1 #197

Merged
merged 18 commits into from
Jul 11, 2023
106 changes: 55 additions & 51 deletions chain/consensus/mir/conf_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"google.golang.org/protobuf/proto"

"github.com/filecoin-project/mir/pkg/client"
mirproto "github.com/filecoin-project/mir/pkg/pb/requestpb"
"github.com/filecoin-project/mir/pkg/pb/trantorpb"
mirproto "github.com/filecoin-project/mir/pkg/pb/trantorpb/types"
"github.com/filecoin-project/mir/pkg/trantor/types"
t "github.com/filecoin-project/mir/pkg/types"
"github.com/filecoin-project/mir/pkg/util/maputil"

Expand All @@ -21,15 +23,15 @@ import (
)

const (
ConfigurationRequestsDBPrefix = "mir/configuration/"
ConfigurationTxDBPrefix = "mir/configuration/"
)

var (
// NextConfigurationNumberKey is used to store SentConfigurationNumber
// that is the maximum configuration request number (nonce) that has been sent.
// that is the maximum configuration transaction number (nonce) that has been sent.
NextConfigurationNumberKey = datastore.NewKey("mir/next-config-number")
// NextAppliedConfigurationNumberKey is used to store AppliedConfigurationNumber
// that is the maximum configuration request number that has been applied.
// that is the maximum configuration transaction number that has been applied.
NextAppliedConfigurationNumberKey = datastore.NewKey("mir/next-applied-config-number")
// ConfigurationVotesKey is used to store configuration votes.
ConfigurationVotesKey = datastore.NewKey("mir/reconfiguration-votes")
Expand All @@ -41,8 +43,8 @@ type ConfigurationManager struct {
ctx context.Context // Parent context
ds db.DB // Persistent storage.
id string // Manager ID.
nextReqNo uint64 // The number that will be used in the next configuration Mir request.
nextAppliedNo uint64 // The number of the next configuration Mir request that will be applied.
nextTxNo uint64 // The number that will be used in the next Mir configuration transaction.
nextAppliedNo uint64 // The number of the next configuration Mir transaction that will be applied.
initialConfiguration membership.Info // Initial membership information.
}

Expand All @@ -51,7 +53,7 @@ func NewConfigurationManager(ctx context.Context, ds db.DB, id string) (*Configu
ctx: ctx,
ds: ds,
id: id,
nextReqNo: 0,
nextTxNo: 0,
nextAppliedNo: 0,
initialConfiguration: membership.Info{},
}
Expand All @@ -67,7 +69,7 @@ func NewConfigurationManagerWithMembershipInfo(ctx context.Context, ds db.DB, id
ctx: ctx,
ds: ds,
id: id,
nextReqNo: 0,
nextTxNo: 0,
nextAppliedNo: 0,
initialConfiguration: *info,
}
Expand All @@ -78,30 +80,30 @@ func NewConfigurationManagerWithMembershipInfo(ctx context.Context, ds db.DB, id
return cm, nil
}

// NewTX creates and returns a new configuration request with the next request number,
// NewTX creates and returns a new configuration transaction with the next nextTxNo number,
// corresponding to the number of transactions previously created by this client.
// Until Done is called with the returned request's number,
// the request will be pending, i.e., among the requests returned by Pending.
func (cm *ConfigurationManager) NewTX(_ uint64, data []byte) (*mirproto.Request, error) {
r := mirproto.Request{
ClientId: cm.id,
ReqNo: cm.nextReqNo,
Type: ConfigurationRequest,
// Until Done is called with the returned transaction number,
// the transaction will be pending, i.e., among the transactions returned by Pending.
func (cm *ConfigurationManager) NewTX(_ uint64, data []byte) (*mirproto.Transaction, error) {
r := mirproto.Transaction{
ClientId: types.ClientID(cm.id),
TxNo: types.TxNo(cm.nextTxNo),
Type: ConfigurationTransaction,
Data: data,
}

if err := cm.storeRequest(&r, cm.nextReqNo); err != nil {
log.With("validator", cm.id).Errorf("unable to store configuration request: %v", err)
if err := cm.storeTx(&r, cm.nextTxNo); err != nil {
log.With("validator", cm.id).Errorf("unable to store configuration tx: %v", err)
return nil, err
}

{
// If a request with number n has been persisted and the node had crashed here
// If a transaction with number n was persisted and the node had crashed here
// then when recovering the next configuration nonce can be n+1.
}

cm.nextReqNo++
cm.storeNextConfigurationNumber(cm.nextReqNo)
cm.nextTxNo++
cm.storeNextConfigurationNumber(cm.nextTxNo)

return &r, nil
}
Expand All @@ -110,24 +112,25 @@ func (cm *ConfigurationManager) GetInitialMembershipInfo() membership.Info {
return cm.initialConfiguration
}

// Done marks a configuration request as done. It will no longer be among the request returned by Pending.
func (cm *ConfigurationManager) Done(txNo t.ReqNo) error {
cm.nextAppliedNo = uint64(txNo) + 1
// Done marks a configuration transaction as done. It will no longer be among the transactions returned by Pending.
func (cm *ConfigurationManager) Done(txNo types.TxNo) error {
cm.nextAppliedNo = txNo.Pb() + 1
cm.storeNextAppliedConfigurationNumber(cm.nextAppliedNo)
cm.removeRequest(uint64(txNo))
cm.removeTx(txNo.Pb())
return nil
}

// Pending returns from the persistent storage all requests previously returned by NewTX that have not been applied yet.
func (cm *ConfigurationManager) Pending() (reqs []*mirproto.Request, err error) {
for i := cm.nextAppliedNo; i < cm.nextReqNo; i++ {
r, err := cm.getRequest(i)
// Pending returns from the persistent storage all transactions previously returned by NewTX
// that have not been applied yet.
func (cm *ConfigurationManager) Pending() (txs []*mirproto.Transaction, err error) {
for i := cm.nextAppliedNo; i < cm.nextTxNo; i++ {
tx, err := cm.getTx(i)
if err != nil {
return nil, err
}
reqs = append(reqs, r)
txs = append(txs, tx)
}
return
return txs, nil
}

// Sync ensures that the effects of all previous calls to NewTX and Done have been written to persistent storage.
Expand All @@ -136,61 +139,62 @@ func (cm *ConfigurationManager) Sync() error {
return nil
}

// recover function recovers configuration number, and configuration requests that may not be applied.
// recover function recovers configuration number, and configuration transactions that may not be applied.
func (cm *ConfigurationManager) recover() error {
nextReqNo := cm.getNextConfigurationNumber()
nextTxNo := cm.getNextConfigurationNumber()
appliedNumber := cm.getAppliedConfigurationNumber()

if nextReqNo == appliedNumber && appliedNumber == 0 {
if nextTxNo == appliedNumber && appliedNumber == 0 {
return nil
}
if appliedNumber > nextReqNo {
return fmt.Errorf("validator %v has incorrect configuration numbers: %d, %d", cm.id, appliedNumber, nextReqNo)
if appliedNumber > nextTxNo {
return fmt.Errorf("validator %v has incorrect configuration numbers: %d, %d", cm.id, appliedNumber, nextTxNo)
}

cm.nextAppliedNo = appliedNumber
cm.nextReqNo = nextReqNo
cm.nextTxNo = nextTxNo

// If the node crashes immediately after the request with number n was persisted then the next configuration nonce can be
// n+1. To distinguish that scenario we have to check the existence of n+1 request.
_, err := cm.getRequest(nextReqNo + 1)
// If the node crashes immediately after the transaction with number n was persisted then the next configuration nonce can be
// n+1. To distinguish that scenario we have to check the existence of n+1 transaction.
_, err := cm.getTx(nextTxNo + 1)
switch {
case errors.Is(err, datastore.ErrNotFound):
return nil
case err == nil:
cm.nextReqNo++
cm.nextTxNo++
return nil
case err != nil:
return err
}
return nil
}

// storeRequest stores a configuration request and the corresponding configuration number in the persistent database.
func (cm *ConfigurationManager) storeRequest(r *mirproto.Request, n uint64) error {
v, err := proto.Marshal(r)
// storeTx stores a configuration transaction and the corresponding configuration number in the persistent database.
func (cm *ConfigurationManager) storeTx(r *mirproto.Transaction, n uint64) error {
v, err := proto.Marshal(r.Pb())
if err != nil {
return err
}
return cm.ds.Put(cm.ctx, configurationIndexKey(n), v)
}

// getRequest gets a configuration request from the persistent database.
func (cm *ConfigurationManager) getRequest(n uint64) (*mirproto.Request, error) {
// getTx gets a configuration transaction from the persistent database.
func (cm *ConfigurationManager) getTx(n uint64) (*mirproto.Transaction, error) {
b, err := cm.ds.Get(cm.ctx, configurationIndexKey(n))
if err != nil {
return nil, err
}
var r mirproto.Request
var r trantorpb.Transaction
if err := proto.Unmarshal(b, &r); err != nil {
return nil, err
}
return &r, nil

return mirproto.TransactionFromPb(&r), nil
}

func (cm *ConfigurationManager) removeRequest(n uint64) {
func (cm *ConfigurationManager) removeTx(n uint64) {
if err := cm.ds.Delete(cm.ctx, configurationIndexKey(n)); err != nil {
log.With("validator", cm.id).Warnf("failed to remove applied configuration request %d: %v", n, err)
log.With("validator", cm.id).Warnf("failed to remove applied configuration tx %d: %v", n, err)
}
}

Expand Down Expand Up @@ -274,7 +278,7 @@ func (cm *ConfigurationManager) storeNumber(key datastore.Key, n uint64) {
}

func configurationIndexKey(n uint64) datastore.Key {
return datastore.NewKey(ConfigurationRequestsDBPrefix + strconv.FormatUint(n, 10))
return datastore.NewKey(ConfigurationTxDBPrefix + strconv.FormatUint(n, 10))
}

func GetConfigurationVotes(vr []VoteRecord) map[uint64]map[string]map[t.NodeID]struct{} {
Expand Down
Loading