Skip to content

Commit

Permalink
integrate shwap into shrex
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jul 10, 2024
1 parent 993fc34 commit f266624
Show file tree
Hide file tree
Showing 53 changed files with 7,608 additions and 57 deletions.
13 changes: 13 additions & 0 deletions libs/utils/close.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package utils

import (
"io"

logging "github.com/ipfs/go-log/v2"
)

func CloseAndLog(log logging.StandardLogger, name string, closer io.Closer) {
if err := closer.Close(); err != nil {
log.Warnf("closing %s: %s", name, err)
}
}
4 changes: 2 additions & 2 deletions nodebuilder/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type Module interface {
// If all blobs were found without any errors, the user will receive a list of blobs.
// If the BlobService couldn't find any blobs under the requested namespaces,
// the user will receive an empty list of blobs along with an empty error.
// If some of the requested namespaces were not found, the user will receive all the found blobs and an empty error.
// If there were internal errors during some of the requests,
// If some of the requested namespaces were not found, the user will receive all the found blobs
// and an empty error. If there were internal errors during some of the requests,
// the user will receive all found blobs along with a combined error message.
//
// All blobs will preserve the order of the namespaces that were requested.
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (

"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
disc "github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
disc "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)

// WithPeerManagerMetrics is a utility function to turn on peer manager metrics and that is
Expand Down
2 changes: 1 addition & 1 deletion share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)

// GetterWithRandSquare provides a share.Getter filled with 'n' NMT
Expand Down
123 changes: 123 additions & 0 deletions share/shwap/namespace_data_id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package shwap

import (
"encoding/binary"
"fmt"

"github.com/celestiaorg/celestia-node/share"
)

// NamespaceDataIDSize defines the total size of a RowNamespaceDataID in bytes, combining the
// size of a RowID and the size of a Namespace.
const NamespaceDataIDSize = EdsIDSize + 4 + share.NamespaceSize

// RowNamespaceDataID uniquely identifies a piece of namespaced data within a row of an Extended
// Data Square (EDS).
type NamespaceDataID struct {
// Embedding EdsID to include the block height in RowID.
EdsID
// FromRow and ToRow specify the range of rows within the data square.
FromRowIndex, ToRowIndex int
// DataNamespace is a string representation of the namespace to facilitate comparisons.
DataNamespace share.Namespace
}

// NewNamespaceDataID creates a new RowNamespaceDataID with the specified parameters. It
// validates the RowNamespaceDataID against the provided Root before returning.
func NewNamespaceDataID(
height uint64,
fromRowIndex, toRowIndex int,
namespace share.Namespace,
edsSize int,
) (NamespaceDataID, error) {
ndid := NamespaceDataID{
EdsID: EdsID{
Height: height,
},
FromRowIndex: fromRowIndex,
ToRowIndex: toRowIndex,
DataNamespace: namespace,
}

if err := ndid.Verify(edsSize); err != nil {
return NamespaceDataID{}, err
}
return ndid, nil
}

// NamespaceDataIDFromBinary deserializes a RowNamespaceDataID from its binary form. It returns
// an error if the binary data's length does not match the expected size.
func NamespaceDataIDFromBinary(data []byte) (NamespaceDataID, error) {
if len(data) != NamespaceDataIDSize {
return NamespaceDataID{},
fmt.Errorf("invalid RowNamespaceDataID length: expected %d, got %d", RowNamespaceDataIDSize, len(data))
}

edsID, err := EdsIDFromBinary(data[:EdsIDSize])
if err != nil {
return NamespaceDataID{}, fmt.Errorf("error unmarshaling RowID: %w", err)
}

fromRowIndex := int(binary.BigEndian.Uint16(data[EdsIDSize:]))
toRowIndex := int(binary.BigEndian.Uint16(data[EdsIDSize+2:]))
ns := share.Namespace(data[EdsIDSize+4:])
if err := ns.ValidateForData(); err != nil {
return NamespaceDataID{}, fmt.Errorf("error validating DataNamespace: %w", err)
}

return NamespaceDataID{
EdsID: edsID,
FromRowIndex: fromRowIndex,
ToRowIndex: toRowIndex,
DataNamespace: ns,
}, nil
}

// MarshalBinary encodes RowNamespaceDataID into binary form.
// NOTE: Proto is avoided because
// * Its size is not deterministic which is required for IPLD.
// * No support for uint16
func (ndid NamespaceDataID) MarshalBinary() ([]byte, error) {
data := make([]byte, 0, NamespaceDataIDSize)
return ndid.appendTo(data), nil
}

// Verify checks the validity of RowNamespaceDataID's fields, including the RowID and the
// namespace.
func (ndid NamespaceDataID) Verify(edsSize int) error {
if ndid.FromRowIndex >= edsSize {
return fmt.Errorf("FromRowIndex: %w: %d >= %d", ErrOutOfBounds, ndid.FromRowIndex, edsSize)
}
if ndid.ToRowIndex >= edsSize {
return fmt.Errorf("ToRowIndex: %w: %d >= %d", ErrOutOfBounds, ndid.ToRowIndex, edsSize)
}
return ndid.Validate()
}

func (ndid NamespaceDataID) Validate() error {
if err := ndid.EdsID.Validate(); err != nil {
return fmt.Errorf("error validating RowID: %w", err)
}
if ndid.FromRowIndex > ndid.ToRowIndex {
return fmt.Errorf("%w: FromRowIndex %d is greater than ToRowIndex %d",
ErrInvalidShwapID, ndid.FromRowIndex, ndid.ToRowIndex)
}
if ndid.FromRowIndex < 0 {
return fmt.Errorf("%w: FromRowIndex %d", ErrInvalidShwapID, ndid.FromRowIndex)
}
if ndid.ToRowIndex < 0 {
return fmt.Errorf("%w: ToRowIndex %d", ErrInvalidShwapID, ndid.ToRowIndex)
}
if err := ndid.DataNamespace.ValidateForData(); err != nil {
return fmt.Errorf("%w: error validating DataNamespace: %w", ErrInvalidShwapID, err)
}
return nil
}

// appendTo helps in appending the binary form of DataNamespace to the serialized RowID data.
func (ndid NamespaceDataID) appendTo(data []byte) []byte {
data = ndid.EdsID.appendTo(data)
data = binary.BigEndian.AppendUint16(data, uint16(ndid.FromRowIndex))
data = binary.BigEndian.AppendUint16(data, uint16(ndid.ToRowIndex))
return append(data, ndid.DataNamespace...)
}
28 changes: 28 additions & 0 deletions share/shwap/namespace_data_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package shwap

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/share/sharetest"
)

func TestNamespaceDataID(t *testing.T) {
odsSize := 4
ns := sharetest.RandV0Namespace()

id, err := NewNamespaceDataID(1, 1, 2, ns, odsSize*2)
require.NoError(t, err)

data, err := id.MarshalBinary()
require.NoError(t, err)

sidOut, err := NamespaceDataIDFromBinary(data)
require.NoError(t, err)
assert.EqualValues(t, id, sidOut)

err = sidOut.Verify(odsSize * 2)
require.NoError(t, err)
}
116 changes: 116 additions & 0 deletions share/shwap/p2p/discovery/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package discovery

import (
"context"
"errors"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)

const (
// gcInterval is a default period after which disconnected peers will be removed from cache
gcInterval = time.Minute
// connectTimeout is the timeout used for dialing peers and discovering peer addresses.
connectTimeout = time.Minute * 2
)

var (
defaultBackoffFactory = backoff.NewFixedBackoff(time.Minute * 10)
errBackoffNotEnded = errors.New("share/discovery: backoff period has not ended")
)

// backoffConnector wraps a libp2p.Host to establish a connection with peers
// with adding a delay for the next connection attempt.
type backoffConnector struct {
h host.Host
backoff backoff.BackoffFactory

cacheLk sync.Mutex
cacheData map[peer.ID]backoffData
}

// backoffData stores time when next connection attempt with the remote peer.
type backoffData struct {
nexttry time.Time
backoff backoff.BackoffStrategy
}

func newBackoffConnector(h host.Host, factory backoff.BackoffFactory) *backoffConnector {
return &backoffConnector{
h: h,
backoff: factory,
cacheData: make(map[peer.ID]backoffData),
}
}

// Connect puts peer to the backoffCache and tries to establish a connection with it.
func (b *backoffConnector) Connect(ctx context.Context, p peer.AddrInfo) error {
if b.HasBackoff(p.ID) {
return errBackoffNotEnded
}

ctx, cancel := context.WithTimeout(ctx, connectTimeout)
defer cancel()

err := b.h.Connect(ctx, p)
// we don't want to add backoff when the context is canceled.
if !errors.Is(err, context.Canceled) {
b.Backoff(p.ID)
}
return err
}

// Backoff adds or extends backoff delay for the peer.
func (b *backoffConnector) Backoff(p peer.ID) {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()

data, ok := b.cacheData[p]
if !ok {
data = backoffData{}
data.backoff = b.backoff()
b.cacheData[p] = data
}

data.nexttry = time.Now().Add(data.backoff.Delay())
b.cacheData[p] = data
}

// HasBackoff checks if peer is in backoff.
func (b *backoffConnector) HasBackoff(p peer.ID) bool {
b.cacheLk.Lock()
cache, ok := b.cacheData[p]
b.cacheLk.Unlock()
return ok && time.Now().Before(cache.nexttry)
}

// GC is a perpetual GCing loop.
func (b *backoffConnector) GC(ctx context.Context) {
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
b.cacheLk.Lock()
for id, cache := range b.cacheData {
if cache.nexttry.Before(time.Now()) {
delete(b.cacheData, id)
}
}
b.cacheLk.Unlock()
}
}
}

func (b *backoffConnector) Size() int {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()
return len(b.cacheData)
}
47 changes: 47 additions & 0 deletions share/shwap/p2p/discovery/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package discovery

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
)

func TestBackoff_ConnectPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
m, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
b := newBackoffConnector(m.Hosts()[0], backoff.NewFixedBackoff(time.Minute))
info := host.InfoFromHost(m.Hosts()[1])
require.NoError(t, b.Connect(ctx, *info))
}

func TestBackoff_ConnectPeerFails(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
m, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
b := newBackoffConnector(m.Hosts()[0], backoff.NewFixedBackoff(time.Minute))
info := host.InfoFromHost(m.Hosts()[1])
require.NoError(t, b.Connect(ctx, *info))

require.Error(t, b.Connect(ctx, *info))
}

func TestBackoff_ResetBackoffPeriod(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
m, err := mocknet.FullMeshLinked(2)
require.NoError(t, err)
b := newBackoffConnector(m.Hosts()[0], backoff.NewFixedBackoff(time.Minute))
info := host.InfoFromHost(m.Hosts()[1])
require.NoError(t, b.Connect(ctx, *info))
nexttry := b.cacheData[info.ID].nexttry
b.Backoff(info.ID)
require.True(t, b.cacheData[info.ID].nexttry.After(nexttry))
}
Loading

0 comments on commit f266624

Please sign in to comment.