Skip to content

Commit

Permalink
Qm sync (#7250)
Browse files Browse the repository at this point in the history
  • Loading branch information
stereosteve authored Jan 19, 2024
1 parent cc892ea commit 09b5dc7
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 0 deletions.
8 changes: 8 additions & 0 deletions mediorum/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ var mediorumMigrationTable = `
);
`

var qmSyncTable = `
create table if not exists qm_sync (
"host" text primary key
);
`

func Migrate(db *sql.DB, myHost string) {
mustExec(db, mediorumMigrationTable)

Expand All @@ -37,6 +43,8 @@ func Migrate(db *sql.DB, myHost string) {
runMigration(db, `create index if not exists uploads_ts_idx on uploads(created_at, transcoded_at)`)

runMigration(db, `drop table if exists "Files", "ClockRecords", "Tracks", "AudiusUsers", "CNodeUsers", "SessionTokens", "ContentBlacklists", "Playlists", "SequelizeMeta", blobs, cid_lookup, cid_log cascade`)

runMigration(db, qmSyncTable)
}

func runMigration(db *sql.DB, ddl string) {
Expand Down
113 changes: 113 additions & 0 deletions mediorum/server/qm_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package server

import (
"bufio"
"context"
"net/http"
"time"

"github.com/labstack/echo/v4"
)

const _qmFileKey = "_data/qm_cids.csv"

func (ss *MediorumServer) writeQmFile() error {
ctx := context.Background()

bail := func(err error) error {
if err != nil {
ss.bucket.Delete(ctx, _qmFileKey)
}
return err
}

// if exists do nothing
if exists, _ := ss.bucket.Exists(ctx, _qmFileKey); exists {
return nil
}

// blob writer
blobWriter, err := ss.bucket.NewWriter(ctx, _qmFileKey, nil)
if err != nil {
return bail(err)
}

// db conn
conn, err := ss.pgPool.Acquire(ctx)
if err != nil {
return bail(err)
}
defer conn.Release()

// doit
_, err = conn.Conn().PgConn().CopyTo(ctx, blobWriter, "COPY qm_cids TO STDOUT")
if err != nil {
return bail(err)
}

return bail(blobWriter.Close())
}

func (ss *MediorumServer) serveInternalQmCsv(c echo.Context) error {
r, err := ss.bucket.NewReader(c.Request().Context(), _qmFileKey, nil)
if err != nil {
return err
}
return c.Stream(200, "text/plain", r)
}

func (ss *MediorumServer) pullQmFromPeer(host string) error {
ctx := context.Background()

done := false
ss.pgPool.QueryRow(ctx, "select count(*) = 1 from qm_sync where host = $1", host).Scan(&done)
if done {
return nil
}

req, err := http.Get(apiPath(host, "internal/qm.csv"))
if err != nil {
return err
}
defer req.Body.Close()

tx, err := ss.pgPool.Begin(context.Background())
if err != nil {
return err
}
defer tx.Rollback(context.Background())

scanner := bufio.NewScanner(req.Body)
for scanner.Scan() {
_, err = tx.Exec(ctx, "insert into qm_cids values ($1) on conflict do nothing", scanner.Text())
if err != nil {
return err
}
}

err = tx.Commit(ctx)
if err != nil {
return err
}

_, err = ss.pgPool.Exec(ctx, "insert into qm_sync values($1)", host)
return err
}

func (ss *MediorumServer) startQmSyncer() {
time.Sleep(time.Minute * 1)

err := ss.writeQmFile()
if err != nil {
ss.logger.Error("qmSync: failed to write qm.csv file", "err", err)
}

time.Sleep(time.Minute * 1)
for _, peer := range ss.findHealthyPeers(time.Hour) {
if err = ss.pullQmFromPeer(peer); err != nil {
ss.logger.Error("qmSync: failed to pull qm.csv from peer", "peer", peer, "err", err)
} else {
ss.logger.Info("qmSync: pulled qm.csv from peer", "peer", peer)
}
}
}
51 changes: 51 additions & 0 deletions mediorum/server/qm_sync_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package server

import (
"context"
"io"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestQmSync(t *testing.T) {
ctx := context.Background()

ss := testNetwork[0]

_, err := ss.pgPool.Exec(ctx, `insert into qm_cids values ('Qm1'), ('Qm2'), ('Qm3') on conflict do nothing`)
assert.NoError(t, err)
err = ss.writeQmFile()
assert.NoError(t, err)

// read it back out
blobReader, err := ss.bucket.NewReader(ctx, _qmFileKey, nil)
assert.NoError(t, err)

cool, err := io.ReadAll(blobReader)
assert.NoError(t, err)
assert.Equal(t, "Qm1 Qm2 Qm3 ", strings.ReplaceAll(string(cool), "\n", " "))

s2 := testNetwork[1]

_, err = s2.pgPool.Exec(ctx, "truncate qm_cids, qm_sync")
assert.NoError(t, err)

s2count := -1
s2.pgPool.QueryRow(ctx, "select count(*) from qm_cids").Scan(&s2count)
assert.Equal(t, 0, s2count)

s2done := false
s2.pgPool.QueryRow(ctx, "select count(*) = 1 from qm_sync where host = $1", ss.Config.Self.Host).Scan(&s2done)
assert.False(t, s2done)

err = s2.pullQmFromPeer(ss.Config.Self.Host)
assert.NoError(t, err)

s2.pgPool.QueryRow(ctx, "select count(*) from qm_cids").Scan(&s2count)
assert.Equal(t, 3, s2count)

s2.pgPool.QueryRow(ctx, "select count(*) = 1 from qm_sync where host = $1", ss.Config.Self.Host).Scan(&s2done)
assert.True(t, s2done)
}
3 changes: 3 additions & 0 deletions mediorum/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func New(config MediorumConfig) (*MediorumServer, error) {
// internal: blobs between peers
internalApi.GET("/blobs/:cid", ss.serveInternalBlobGET, cidutil.UnescapeCidParam, middleware.BasicAuth(ss.checkBasicAuth))
internalApi.POST("/blobs", ss.serveInternalBlobPOST, middleware.BasicAuth(ss.checkBasicAuth))
internalApi.GET("/qm.csv", ss.serveInternalQmCsv)

// WIP internal: metrics
internalApi.GET("/metrics", ss.getMetrics)
Expand Down Expand Up @@ -432,6 +433,8 @@ func (ss *MediorumServer) MustStart() {

go ss.startRepairer()

go ss.startQmSyncer()

ss.crud.StartClients()

go ss.startPollingDelistStatuses()
Expand Down

0 comments on commit 09b5dc7

Please sign in to comment.