Skip to content

Commit

Permalink
Add MongoDB shard support for oplog push and replay
Browse files Browse the repository at this point in the history
Signed-off-by: sayedppqq <sayed@appscode.com>
  • Loading branch information
sayedppqq committed Jun 4, 2024
1 parent 0fccf8f commit 471e695
Show file tree
Hide file tree
Showing 15 changed files with 310 additions and 74 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ MONGO_VERSION ?= "4.2.8"
GOLANGCI_LINT_VERSION ?= "v1.52.2"
REDIS_VERSION ?= "5.0.8"
TOOLS_MOD_DIR := ./internal/tools
REGISTRY := sayedppqq

BUILD_TAGS:=

Expand Down Expand Up @@ -308,3 +309,8 @@ unlink_libsodium:
build_client:
cd cmd/daemonclient && \
go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`"

update: mongo_build
docker build --tag walg:1.0 ./main/mongo
docker tag walg:1.0 ${REGISTRY}/walg:1.0
docker push ${REGISTRY}/walg:1.0
2 changes: 1 addition & 1 deletion cmd/mongo/oplog_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ var oplogFetchCmd = &cobra.Command{
tracelog.ErrorLogger.FatalOnError(err)

// setup storage fetcher
oplogFetcher := stages.NewStorageFetcher(downloader, path)
oplogFetcher := stages.NewStorageFetcher(downloader, path, "")

// run worker cycle
err = mongo.HandleOplogReplay(ctx, since, until, oplogFetcher, oplogApplier)
Expand Down
25 changes: 23 additions & 2 deletions cmd/mongo/oplog_push.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package mongo

import (
"context"
storageapi "kubestash.dev/apimachinery/apis/storage/v1alpha1"
"os"
"path"
"syscall"
"time"

Expand Down Expand Up @@ -74,10 +76,15 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
if err != nil {
return err
}
uplProvider.ChangeDirectory(models.OplogArchBasePath)
subDir := models.OplogArchBasePath
if pushArgs.dbProvider == string(storageapi.ProviderLocal) {
subDir = path.Join(pushArgs.dbPath, subDir)
}
uplProvider.ChangeDirectory(subDir)
uploader := archive.NewStorageUploader(uplProvider)
uploader.SetKubeClient(pushArgs.kubeClient)
uploader.SetSnapshot(snapshotName, snapshotNamespace)
uploader.SetDBNode(pushArgs.dbNode)

// set up mongodb client and oplog fetcher
mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL)
Expand Down Expand Up @@ -106,6 +113,8 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo
if err != nil {
return err
}
downloader.SetNodeSpecificDownloader(uploader.GetDBNode())

since, err := discovery.ResolveStartingTS(ctx, downloader, mongoClient)
if err != nil {
return err
Expand Down Expand Up @@ -143,6 +152,9 @@ type oplogPushRunArgs struct {
archiveAfterSize int
archiveTimeout time.Duration
mongodbURL string
dbNode string
dbProvider string
dbPath string
primaryWait bool
primaryWaitTimeout time.Duration
lwUpdate time.Duration
Expand All @@ -165,6 +177,15 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
return
}

args.dbNode, err = conf.GetRequiredSetting(conf.MongoDBNode)
if err != nil {
return
}

args.dbProvider = conf.GetNonRequiredSetting(conf.MongoDBProvider)

args.dbPath = conf.GetNonRequiredSetting(conf.MongoDBPath)

args.primaryWait, err = conf.GetBoolSettingDefault(conf.OplogPushWaitForBecomePrimary, false)
if err != nil {
return
Expand Down Expand Up @@ -192,7 +213,7 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) {
return
}

return
return args, err
}

type oplogPushStatsArgs struct {
Expand Down
52 changes: 31 additions & 21 deletions cmd/mongo/oplog_replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package mongo
import (
"context"
"encoding/json"
"github.com/wal-g/wal-g/internal/databases/mongo/shake"
"os"
"syscall"

"github.com/spf13/cobra"
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/internal"
conf "github.com/wal-g/wal-g/internal/config"
"github.com/wal-g/wal-g/internal/databases/mongo"
"github.com/wal-g/wal-g/internal/databases/mongo/archive"
Expand Down Expand Up @@ -49,6 +49,7 @@ type oplogReplayRunArgs struct {

ignoreErrCodes map[string][]int32
mongodbURL string
dbNode string

oplogAlwaysUpsert *bool
oplogApplicationMode *string
Expand Down Expand Up @@ -76,6 +77,11 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err
return
}

args.dbNode, err = conf.GetRequiredSetting(conf.MongoDBNode)
if err != nil {
return
}

oplogAlwaysUpsert, hasOplogAlwaysUpsert, err := conf.GetBoolSetting(conf.OplogReplayOplogAlwaysUpsert)
if err != nil {
return
Expand All @@ -92,24 +98,24 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err
return args, nil
}

func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) {
switch arg {
case internal.LatestString:
return downloader.LastKnownArchiveTS()
case LatestBackupString:
lastBackupName, err := downloader.LastBackupName()
if err != nil {
return models.Timestamp{}, err
}
backupMeta, err := downloader.BackupMeta(lastBackupName)
if err != nil {
return models.Timestamp{}, err
}
return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil
default:
return models.TimestampFromStr(arg)
}
}
//func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) {
// switch arg {
// case internal.LatestString:
// return downloader.LastKnownArchiveTS()
// case LatestBackupString:
// lastBackupName, err := downloader.LastBackupName()
// if err != nil {
// return models.Timestamp{}, err
// }
// backupMeta, err := downloader.BackupMeta(lastBackupName)
// if err != nil {
// return models.Timestamp{}, err
// }
// return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil
// default:
// return models.TimestampFromStr(arg)
// }
//}

func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
tracelog.DebugLogger.Printf("starting replay with arguments: %+v", replayArgs)
Expand All @@ -134,14 +140,18 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
return err
}

dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes)
filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)}
dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList)

oplogApplier := stages.NewGenericApplier(dbApplier)

// set up storage downloader client
downloader, err := archive.NewStorageDownloader(archive.NewDefaultStorageSettings())
if err != nil {
return err
}
downloader.SetNodeSpecificDownloader(replayArgs.dbNode)

// discover archive sequence to replay
archives, err := downloader.ListOplogArchives()
if err != nil {
Expand All @@ -157,7 +167,7 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error {
}

// setup storage fetcher
oplogFetcher := stages.NewStorageFetcher(downloader, path)
oplogFetcher := stages.NewStorageFetcher(downloader, path, replayArgs.dbNode)

// run worker cycle
return mongo.HandleOplogReplay(ctx, replayArgs.since, replayArgs.until, oplogFetcher, oplogApplier)
Expand Down
8 changes: 8 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ const (
ProfileMode = "PROFILE_MODE"
ProfilePath = "PROFILE_PATH"

MongoDBProvider = "MONGODB_PROVIDER"
MongoDBPath = "MONGODB_PATH"
MongoDBUriSetting = "MONGODB_URI"
MongoDBNode = "MONGODB_NODE"
MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL"
MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP"
OplogArchiveAfterSize = "OPLOG_ARCHIVE_AFTER_SIZE"
Expand Down Expand Up @@ -828,6 +831,11 @@ func GetRequiredSetting(setting string) (string, error) {
return val, nil
}

func GetNonRequiredSetting(setting string) string {
val, _ := GetSetting(setting)
return val
}

func GetBoolSettingDefault(setting string, def bool) (bool, error) {
val, ok := GetSetting(setting)
if !ok {
Expand Down
52 changes: 41 additions & 11 deletions internal/databases/mongo/archive/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type StorageDownloader struct {
rootFolder storage.Folder
oplogsFolder storage.Folder
backupsFolder storage.Folder
dbNode string
}

// NewStorageDownloader builds mongodb downloader.
Expand All @@ -88,6 +89,13 @@ func NewStorageDownloader(opts StorageSettings) (*StorageDownloader, error) {
nil
}

func (sd *StorageDownloader) SetNodeSpecificDownloader(node string) {
sd.dbNode = node
}
func (sd *StorageDownloader) GetNodeSpecificDownloader() string {
return sd.dbNode
}

// BackupMeta downloads sentinel contents.
func (sd *StorageDownloader) BackupMeta(name string) (*models.Backup, error) {
return common.DownloadSentinel(sd.backupsFolder, name)
Expand Down Expand Up @@ -125,7 +133,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) {

// DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive.
func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error {
return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.Filename(), arch.Extension(), writeCloser)
return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder),
arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser)
}

// ListOplogArchives fetches all oplog archives existed in storage.
Expand All @@ -138,7 +147,10 @@ func (sd *StorageDownloader) ListOplogArchives() ([]models.Archive, error) {
archives := make([]models.Archive, 0, len(objects))
for _, key := range objects {
archName := key.GetName()
arch, err := models.ArchFromFilename(archName)
if !isOplogForSpecificNode(archName, sd.dbNode) {
continue
}
arch, err := models.ArchFromFilename(archName, sd.dbNode)
if err != nil {
return nil, fmt.Errorf("can not convert retrieve timestamps since oplog archive Ext '%s': %w", archName, err)
}
Expand All @@ -156,7 +168,10 @@ func (sd *StorageDownloader) LastKnownArchiveTS() (models.Timestamp, error) {
}
for _, key := range keys {
filename := key.GetName()
arch, err := models.ArchFromFilename(filename)
if !isOplogForSpecificNode(filename, sd.dbNode) {
continue
}
arch, err := models.ArchFromFilename(filename, sd.dbNode)
if err != nil {
return models.Timestamp{}, fmt.Errorf("can not build archive since filename '%s': %w", filename, err)
}
Expand Down Expand Up @@ -210,6 +225,7 @@ type StorageUploader struct {
kubeClient controllerruntime.Client
snapshotName string
snapshotNamespace string
dbNode string
}

// NewStorageUploader builds mongodb uploader.
Expand All @@ -227,6 +243,13 @@ func (su *StorageUploader) SetSnapshot(name, namespace string) {
su.snapshotNamespace = namespace
}

func (su *StorageUploader) SetDBNode(node string) {
su.dbNode = node
}
func (su *StorageUploader) GetDBNode() string {
return su.dbNode
}

func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error {
var snapshot storageapi.Snapshot
err := su.kubeClient.Get(context.TODO(), controllerruntime.ObjectKey{
Expand All @@ -236,6 +259,8 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
if err != nil {
return err
}
compName := "wal"
compName = compName + "-" + su.GetDBNode()

_, err = kmc.PatchStatus(
context.TODO(),
Expand All @@ -245,17 +270,17 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro
in := obj.(*storageapi.Snapshot)
if len(in.Status.Components) == 0 {
in.Status.Components = make(map[string]storageapi.Component)

}
if _, ok := in.Status.Components[compName]; !ok {
walSegments := make([]storageapi.WalSegment, 1)
walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)}
in.Status.Components["wal"] = storageapi.Component{
in.Status.Components[compName] = storageapi.Component{
WalSegments: walSegments,
}
}

component := in.Status.Components["wal"]
component := in.Status.Components[compName]
component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)}
in.Status.Components["wal"] = component
in.Status.Components[compName] = component

return in
},
Expand All @@ -281,9 +306,9 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea
if err != nil {
return err
}

fileName := arch.DBNodeSpecificFileName(su.dbNode)
// providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage
return su.Upload(ctx, arch.Filename(), bytes.NewReader(su.buf.Bytes()))
return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes()))
}

// UploadGap uploads mark indicating archiving gap.
Expand All @@ -297,7 +322,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model
return fmt.Errorf("can not build archive: %w", err)
}

if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.Filename()); err != nil {
if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()),
arch.DBNodeSpecificFileName(su.dbNode)); err != nil {
return fmt.Errorf("error while uploading stream: %w", err)
}
return nil
Expand Down Expand Up @@ -367,3 +393,7 @@ func (sp *StoragePurger) DeleteOplogArchives(archives []models.Archive) error {
tracelog.DebugLogger.Printf("Oplog keys will be deleted: %+v\n", oplogKeys)
return sp.oplogsFolder.DeleteObjects(oplogKeys)
}

func isOplogForSpecificNode(fileName, node string) bool {
return strings.Contains(fileName, node)
}
Loading

0 comments on commit 471e695

Please sign in to comment.