Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assigner waits to get existing assignments from indexers #1116

Merged
merged 3 commits into from
Dec 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 92 additions & 24 deletions assigner/core/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -28,6 +29,8 @@ type Assigner struct {
assigned map[peer.ID]*assignment
// indexerPool is the set of indexers to assign publishers to.
indexerPool []indexerInfo
// initDone is true when assignments have been read from all indexers.
initDone bool
// mutex protects assigned.
mutex sync.Mutex
p2pHost host.Host
Expand Down Expand Up @@ -78,6 +81,7 @@ type indexerInfo struct {
adminURL string
ingestURL string
assigned int32
initDone bool
}

// assignedCount returns the number of publishers assigned to this indexer.
Expand Down Expand Up @@ -106,9 +110,6 @@ func NewAssigner(ctx context.Context, cfg config.Assignment, p2pHost host.Host)
return nil, err
}

// Get the publishers currently assigned to each indexer in the pool.
assigned := initAssignments(ctx, indexerPool, presets)

policy, err := peerutil.NewPolicyStrings(cfg.Policy.Allow, cfg.Policy.Except)
if err != nil {
return nil, fmt.Errorf("bad allow policy: %s", err)
Expand All @@ -125,17 +126,36 @@ func NewAssigner(ctx context.Context, cfg config.Assignment, p2pHost host.Host)

log.Infof("Assigner operating with %d indexers", len(indexerPool))

replication := cfg.Replication
if replication <= 0 {
replication = 1
} else if replication > len(indexerPool) {
replication = len(indexerPool)
}

a := &Assigner{
assigned: assigned,
assigned: make(map[peer.ID]*assignment),
indexerPool: indexerPool,
p2pHost: p2pHost,
policy: policy,
presets: presets,
receiver: rcvr,
replication: cfg.Replication,
replication: replication,
watchDone: make(chan struct{}),
}

// Get the publishers currently assigned to each indexer in the pool. If
// assignments cannot be read from all, then retry later when an unassigned
// publisher is seen. Reduce the needed assignments for the publisher by
// the number of offline indexers to prevent ofer-assigning indexers in the
// pool.
downCount := a.initAssignments(ctx)
if downCount != 0 {
log.Warnw("Could not get existing assignments for all indexers in pool, will retry later")
} else {
log.Infow("Read assignments from all indexers")
}

go a.watch()

return a, nil
Expand All @@ -153,13 +173,17 @@ func (a *Assigner) IndexerAssignedCounts() []int {
return counts
}

func initAssignments(ctx context.Context, indexerPool []indexerInfo, presets map[peer.ID][]int) map[peer.ID]*assignment {
assigned := make(map[peer.ID]*assignment)
func (a *Assigner) InitDone() bool {
a.mutex.Lock()
defer a.mutex.Unlock()
return a.initDone
}

func (a *Assigner) initAssignments(ctx context.Context) int {
// If a publisher is pre-assigned to specific indexers, then ignore
// any indexer that is not one of those pre-assigned.
wrongPreset := func(pubID peer.ID, indexerNum int) bool {
preset, usesPreset := presets[pubID]
preset, usesPreset := a.presets[pubID]
if usesPreset {
for _, p := range preset {
if p == indexerNum {
Expand All @@ -174,10 +198,16 @@ func initAssignments(ctx context.Context, indexerPool []indexerInfo, presets map
return false
}

for i := range indexerPool {
pubs, prefPubs, err := getAssignments(ctx, indexerPool[i].adminURL)
var needInit int
for i := range a.indexerPool {
if a.indexerPool[i].initDone {
continue
}

pubs, prefPubs, err := getAssignments(ctx, a.indexerPool[i].adminURL)
if err != nil {
log.Errorw("Could not get assignments from indexer", "err", err, "indexer", i, "adminURL", indexerPool[i].adminURL)
needInit++
log.Errorw("Could not get assignments from indexer", "err", err, "indexer", i, "adminURL", a.indexerPool[i].adminURL)
continue
}

Expand All @@ -186,37 +216,44 @@ func initAssignments(ctx context.Context, indexerPool []indexerInfo, presets map
if wrongPreset(pubID, i) {
continue
}
asmt, found := assigned[pubID]
asmt, found := a.assigned[pubID]
if !found {
asmt = &assignment{
indexers: []int{},
}
assigned[pubID] = asmt
a.assigned[pubID] = asmt
}
asmt.addIndexer(i)
indexerPool[i].assigned++
a.indexerPool[i].assigned++
}
log.Infof("Indexer %d has %d assignments", i, indexerPool[i].assigned)
log.Infof("Indexer %d has %d assignments", i, a.indexerPool[i].assigned)

// Add this indexer to each publisher's preferred assignments.
for _, pubID := range prefPubs {
if wrongPreset(pubID, i) {
continue
}
asmt, found := assigned[pubID]
asmt, found := a.assigned[pubID]
if !found {
asmt = &assignment{
indexers: []int{},
}
assigned[pubID] = asmt
a.assigned[pubID] = asmt
} else if asmt.hasIndexer(i) {
log.Errorw("Publisher assigned to indexer cannot be listed as preferred", "indexer", i, "publisher", pubID)
continue
}
asmt.preferred = append(asmt.preferred, i)
}

a.indexerPool[i].initDone = true
}

if needInit == 0 {
a.initDone = true
}
return assigned

return needInit
}

// indexersFromConfig reads the indexer pool config to create the indexer pool.
Expand Down Expand Up @@ -383,6 +420,7 @@ func (a *Assigner) closeNotifyAssignment(pubID peer.ID) {
// watch fetches announce messages from the Receiver.
func (a *Assigner) watch() {
defer close(a.watchDone)
var pending sync.WaitGroup

// Cancel any pending messages if this function exits.
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -402,8 +440,14 @@ func (a *Assigner) watch() {
continue
}

go a.makeAssignments(ctx, amsg, asmt, need)
pending.Add(1)
go func() {
a.makeAssignments(ctx, amsg, asmt, need)
pending.Done()
}()
}
// Wait for pending assignments in to complete.
pending.Wait()
}

// checkAssignment checks if a publisher is assigned to sufficient indexers.
Expand All @@ -414,9 +458,6 @@ func (a *Assigner) checkAssignment(pubID peer.ID) (*assignment, int) {
required = len(preset)
} else {
required = a.replication
if required == 0 {
required = len(a.indexerPool)
}
}

a.mutex.Lock()
Expand All @@ -433,8 +474,34 @@ func (a *Assigner) checkAssignment(pubID peer.ID) (*assignment, int) {
log.Debug("Publisher already assigned to all required indexers")
return nil, 0
}

need := required - len(asmt.indexers)
if !a.initDone {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
downCount := a.initAssignments(ctx)
cancel()
// If the number of unavailable indexers is as many or more than needed, then do not do
// assignment since peer could already be assigned to offline indexers.
need -= downCount
if need <= 0 {
return nil, 0
}
}

asmt.processing = true
return asmt, required - len(asmt.indexers)
return asmt, need
}

if !a.initDone {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
downCount := a.initAssignments(ctx)
cancel()
// If the number of unavailable indexers is as many or more than needed, then do not do
// assignment since peer could already be assigned to offline indexers.
required -= downCount
if required <= 0 {
return nil, 0
}
}

// Publisher not yet assigned to an indexer, so make assignment.
Expand Down Expand Up @@ -488,7 +555,8 @@ func (a *Assigner) makeAssignments(ctx context.Context, amsg announce.Announce,
for _, indexerNum := range candidates {
err := assignIndexer(ctx, a.indexerPool[indexerNum], amsg)
if err != nil {
log.Errorw("Could not assign publisher to indexer", "indexer", indexerNum, "adminURL", a.indexerPool[indexerNum].adminURL)
log.Errorw("Could not assign publisher to indexer", "err", err,
"indexer", indexerNum, "adminURL", a.indexerPool[indexerNum].adminURL)
continue
}
asmt.addIndexer(indexerNum)
Expand Down
104 changes: 104 additions & 0 deletions assigner/core/assigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestAssignerAll(t *testing.T) {

assigner, err := core.NewAssigner(ctx, cfgAssignment, nil)
require.NoError(t, err)
require.True(t, assigner.InitDone())

t.Log("Presets for", peer1IDStr, "=", assigner.Presets(peer1ID))
require.Equal(t, []int{0, 1}, assigner.Presets(peer1ID))
Expand Down Expand Up @@ -451,6 +452,109 @@ func TestAssignerPreferred(t *testing.T) {
require.NoError(t, assigner.Close())
}

func TestPoolIndexerOffline(t *testing.T) {
fakeIndexer1 := newTestIndexer(nil)
defer fakeIndexer1.close()

ready := make(chan struct{})
fakeIndexer2 := newTestIndexer(func(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()

select {
case <-ready:
default:
http.Error(w, "not ready", http.StatusServiceUnavailable)
return
}

if req.Method == "GET" {
switch req.URL.String() {
case "/ingest/assigned":
peers := []string{peer1IDStr}
data, err := json.Marshal(peers)
if err != nil {
panic(err.Error())
}
writeJsonResponse(w, http.StatusOK, data)
case "/ingest/preferred":
writeJsonResponse(w, http.StatusNoContent, nil)
default:
http.Error(w, "", http.StatusNotFound)
}
} else {
writeJsonResponse(w, http.StatusOK, nil)
}
})
defer fakeIndexer2.close()

cfgAssignment := config.Assignment{
// IndexerPool is the set of indexers the pool.
IndexerPool: []config.Indexer{
{
AdminURL: fakeIndexer1.adminServer.URL,
IngestURL: fakeIndexer1.ingestServer.URL,
},
{
AdminURL: fakeIndexer2.adminServer.URL,
IngestURL: fakeIndexer2.ingestServer.URL,
},
},
Policy: config.Policy{
Allow: true,
},
PubSubTopic: "testtopic",
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Should not get all initial assignments.
assigner, err := core.NewAssigner(ctx, cfgAssignment, nil)
require.NoError(t, err)
require.False(t, assigner.InitDone())

// ----- Announce an unassigned publisher and check that it does not get assigned. -----

asmtChan, cancel := assigner.OnAssignment(peer2ID)
defer cancel()

adCid, _ := cid.Decode("bafybeigvgzoolc3drupxhlevdp2ugqcrbcsqfmcek2zxiw5wctk3xjpjwy")
a, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/9999")
addrInfo := peer.AddrInfo{
ID: peer2ID,
Addrs: []multiaddr.Multiaddr{a},
}
err = assigner.Announce(ctx, adCid, addrInfo)
require.NoError(t, err)

timeout := time.NewTimer(2 * time.Second)
select {
case <-asmtChan:
t.Fatal("shouold not see assignment with offline indexer")
case <-timeout.C:
}
require.False(t, assigner.InitDone())

// ----- Allow second indexer to work so that initialization can complete. ---
close(ready)

adCid, _ = cid.Decode("QmNiV8rwXeC92hufGNu5qJ6L9AygrvDyi63gEpCQaqsE9B")
err = assigner.Announce(ctx, adCid, addrInfo)
require.NoError(t, err)

timeout.Reset(2 * time.Second)
select {
case <-asmtChan:
case <-timeout.C:
t.Fatal("timed out waiting for assignment")
}
timeout.Stop()

require.True(t, assigner.InitDone())

require.NoError(t, assigner.Close())
}

type testIndexer struct {
adminServer *httptest.Server
ingestServer *httptest.Server
Expand Down