From 471e69520c80510c343c7ea158ef432bb05fd0fe Mon Sep 17 00:00:00 2001 From: sayedppqq Date: Tue, 4 Jun 2024 19:14:38 +0600 Subject: [PATCH] Add MongoDB shard support for oplog push and replay Signed-off-by: sayedppqq --- Makefile | 6 ++ cmd/mongo/oplog_fetch.go | 2 +- cmd/mongo/oplog_push.go | 25 ++++- cmd/mongo/oplog_replay.go | 52 ++++++---- internal/config/config.go | 8 ++ internal/databases/mongo/archive/loader.go | 52 +++++++--- internal/databases/mongo/models/archive.go | 17 ++-- .../databases/mongo/models/archive_test.go | 2 +- internal/databases/mongo/oplog/applier.go | 41 ++++---- internal/databases/mongo/shake/filter.go | 95 +++++++++++++++++++ internal/databases/mongo/shake/oplog.go | 53 +++++++++++ internal/databases/mongo/stages/fetcher.go | 12 ++- .../databases/mysql/binlog_fetch_handler.go | 4 +- .../databases/mysql/binlog_replay_handler.go | 2 +- main/mongo/Dockerfile | 13 +++ 15 files changed, 310 insertions(+), 74 deletions(-) create mode 100644 internal/databases/mongo/shake/filter.go create mode 100644 internal/databases/mongo/shake/oplog.go create mode 100644 main/mongo/Dockerfile diff --git a/Makefile b/Makefile index 6ac982f79..3f6f59c45 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ MONGO_VERSION ?= "4.2.8" GOLANGCI_LINT_VERSION ?= "v1.52.2" REDIS_VERSION ?= "5.0.8" TOOLS_MOD_DIR := ./internal/tools +REGISTRY := sayedppqq BUILD_TAGS:= @@ -308,3 +309,8 @@ unlink_libsodium: build_client: cd cmd/daemonclient && \ go build -o ../../bin/walg-daemon-client -ldflags "-s -w -X main.buildDate=`date -u +%Y.%m.%d_%H:%M:%S` -X main.gitRevision=`git rev-parse --short HEAD` -X main.version=`git tag -l --points-at HEAD`" + +update: mongo_build + docker build --tag walg:1.0 ./main/mongo + docker tag walg:1.0 ${REGISTRY}/walg:1.0 + docker push ${REGISTRY}/walg:1.0 diff --git a/cmd/mongo/oplog_fetch.go b/cmd/mongo/oplog_fetch.go index 9d81c63b5..f6cded29e 100644 --- a/cmd/mongo/oplog_fetch.go +++ b/cmd/mongo/oplog_fetch.go @@ -50,7 +50,7 @@ var oplogFetchCmd = &cobra.Command{ tracelog.ErrorLogger.FatalOnError(err) // setup storage fetcher - oplogFetcher := stages.NewStorageFetcher(downloader, path) + oplogFetcher := stages.NewStorageFetcher(downloader, path, "") // run worker cycle err = mongo.HandleOplogReplay(ctx, since, until, oplogFetcher, oplogApplier) diff --git a/cmd/mongo/oplog_push.go b/cmd/mongo/oplog_push.go index 203b7790e..b3df98ff4 100644 --- a/cmd/mongo/oplog_push.go +++ b/cmd/mongo/oplog_push.go @@ -2,7 +2,9 @@ package mongo import ( "context" + storageapi "kubestash.dev/apimachinery/apis/storage/v1alpha1" "os" + "path" "syscall" "time" @@ -74,10 +76,15 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo if err != nil { return err } - uplProvider.ChangeDirectory(models.OplogArchBasePath) + subDir := models.OplogArchBasePath + if pushArgs.dbProvider == string(storageapi.ProviderLocal) { + subDir = path.Join(pushArgs.dbPath, subDir) + } + uplProvider.ChangeDirectory(subDir) uploader := archive.NewStorageUploader(uplProvider) uploader.SetKubeClient(pushArgs.kubeClient) uploader.SetSnapshot(snapshotName, snapshotNamespace) + uploader.SetDBNode(pushArgs.dbNode) // set up mongodb client and oplog fetcher mongoClient, err := client.NewMongoClient(ctx, pushArgs.mongodbURL) @@ -106,6 +113,8 @@ func runOplogPush(ctx context.Context, pushArgs oplogPushRunArgs, statsArgs oplo if err != nil { return err } + downloader.SetNodeSpecificDownloader(uploader.GetDBNode()) + since, err := discovery.ResolveStartingTS(ctx, downloader, mongoClient) if err != nil { return err @@ -143,6 +152,9 @@ type oplogPushRunArgs struct { archiveAfterSize int archiveTimeout time.Duration mongodbURL string + dbNode string + dbProvider string + dbPath string primaryWait bool primaryWaitTimeout time.Duration lwUpdate time.Duration @@ -165,6 +177,15 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { return } + args.dbNode, err = conf.GetRequiredSetting(conf.MongoDBNode) + if err != nil { + return + } + + args.dbProvider = conf.GetNonRequiredSetting(conf.MongoDBProvider) + + args.dbPath = conf.GetNonRequiredSetting(conf.MongoDBPath) + args.primaryWait, err = conf.GetBoolSettingDefault(conf.OplogPushWaitForBecomePrimary, false) if err != nil { return @@ -192,7 +213,7 @@ func buildOplogPushRunArgs() (args oplogPushRunArgs, err error) { return } - return + return args, err } type oplogPushStatsArgs struct { diff --git a/cmd/mongo/oplog_replay.go b/cmd/mongo/oplog_replay.go index 0de89cc53..0815c5d82 100644 --- a/cmd/mongo/oplog_replay.go +++ b/cmd/mongo/oplog_replay.go @@ -3,12 +3,12 @@ package mongo import ( "context" "encoding/json" + "github.com/wal-g/wal-g/internal/databases/mongo/shake" "os" "syscall" "github.com/spf13/cobra" "github.com/wal-g/tracelog" - "github.com/wal-g/wal-g/internal" conf "github.com/wal-g/wal-g/internal/config" "github.com/wal-g/wal-g/internal/databases/mongo" "github.com/wal-g/wal-g/internal/databases/mongo/archive" @@ -49,6 +49,7 @@ type oplogReplayRunArgs struct { ignoreErrCodes map[string][]int32 mongodbURL string + dbNode string oplogAlwaysUpsert *bool oplogApplicationMode *string @@ -76,6 +77,11 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err return } + args.dbNode, err = conf.GetRequiredSetting(conf.MongoDBNode) + if err != nil { + return + } + oplogAlwaysUpsert, hasOplogAlwaysUpsert, err := conf.GetBoolSetting(conf.OplogReplayOplogAlwaysUpsert) if err != nil { return @@ -92,24 +98,24 @@ func buildOplogReplayRunArgs(cmdargs []string) (args oplogReplayRunArgs, err err return args, nil } -func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) { - switch arg { - case internal.LatestString: - return downloader.LastKnownArchiveTS() - case LatestBackupString: - lastBackupName, err := downloader.LastBackupName() - if err != nil { - return models.Timestamp{}, err - } - backupMeta, err := downloader.BackupMeta(lastBackupName) - if err != nil { - return models.Timestamp{}, err - } - return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil - default: - return models.TimestampFromStr(arg) - } -} +//func processArg(arg string, downloader *archive.StorageDownloader) (models.Timestamp, error) { +// switch arg { +// case internal.LatestString: +// return downloader.LastKnownArchiveTS() +// case LatestBackupString: +// lastBackupName, err := downloader.LastBackupName() +// if err != nil { +// return models.Timestamp{}, err +// } +// backupMeta, err := downloader.BackupMeta(lastBackupName) +// if err != nil { +// return models.Timestamp{}, err +// } +// return models.TimestampFromBson(backupMeta.MongoMeta.BackupLastTS), nil +// default: +// return models.TimestampFromStr(arg) +// } +//} func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { tracelog.DebugLogger.Printf("starting replay with arguments: %+v", replayArgs) @@ -134,7 +140,9 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { return err } - dbApplier := oplog.NewDBApplier(mongoClient, false, replayArgs.ignoreErrCodes) + filterList := shake.OplogFilterChain{new(shake.AutologousFilter), new(shake.NoopFilter)} + dbApplier := oplog.NewDBApplier(mongoClient, true, replayArgs.ignoreErrCodes, replayArgs.dbNode, filterList) + oplogApplier := stages.NewGenericApplier(dbApplier) // set up storage downloader client @@ -142,6 +150,8 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { if err != nil { return err } + downloader.SetNodeSpecificDownloader(replayArgs.dbNode) + // discover archive sequence to replay archives, err := downloader.ListOplogArchives() if err != nil { @@ -157,7 +167,7 @@ func runOplogReplay(ctx context.Context, replayArgs oplogReplayRunArgs) error { } // setup storage fetcher - oplogFetcher := stages.NewStorageFetcher(downloader, path) + oplogFetcher := stages.NewStorageFetcher(downloader, path, replayArgs.dbNode) // run worker cycle return mongo.HandleOplogReplay(ctx, replayArgs.since, replayArgs.until, oplogFetcher, oplogApplier) diff --git a/internal/config/config.go b/internal/config/config.go index bbd1b24f7..d0cdf8418 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -117,7 +117,10 @@ const ( ProfileMode = "PROFILE_MODE" ProfilePath = "PROFILE_PATH" + MongoDBProvider = "MONGODB_PROVIDER" + MongoDBPath = "MONGODB_PATH" MongoDBUriSetting = "MONGODB_URI" + MongoDBNode = "MONGODB_NODE" MongoDBLastWriteUpdateInterval = "MONGODB_LAST_WRITE_UPDATE_INTERVAL" MongoDBRestoreDisableHostResetup = "MONGODB_RESTORE_DISABLE_HOST_RESETUP" OplogArchiveAfterSize = "OPLOG_ARCHIVE_AFTER_SIZE" @@ -828,6 +831,11 @@ func GetRequiredSetting(setting string) (string, error) { return val, nil } +func GetNonRequiredSetting(setting string) string { + val, _ := GetSetting(setting) + return val +} + func GetBoolSettingDefault(setting string, def bool) (bool, error) { val, ok := GetSetting(setting) if !ok { diff --git a/internal/databases/mongo/archive/loader.go b/internal/databases/mongo/archive/loader.go index c5725e587..3f8096894 100644 --- a/internal/databases/mongo/archive/loader.go +++ b/internal/databases/mongo/archive/loader.go @@ -73,6 +73,7 @@ type StorageDownloader struct { rootFolder storage.Folder oplogsFolder storage.Folder backupsFolder storage.Folder + dbNode string } // NewStorageDownloader builds mongodb downloader. @@ -88,6 +89,13 @@ func NewStorageDownloader(opts StorageSettings) (*StorageDownloader, error) { nil } +func (sd *StorageDownloader) SetNodeSpecificDownloader(node string) { + sd.dbNode = node +} +func (sd *StorageDownloader) GetNodeSpecificDownloader() string { + return sd.dbNode +} + // BackupMeta downloads sentinel contents. func (sd *StorageDownloader) BackupMeta(name string) (*models.Backup, error) { return common.DownloadSentinel(sd.backupsFolder, name) @@ -125,7 +133,8 @@ func (sd *StorageDownloader) LastBackupName() (string, error) { // DownloadOplogArchive downloads, decompresses and decrypts (if needed) oplog archive. func (sd *StorageDownloader) DownloadOplogArchive(arch models.Archive, writeCloser io.WriteCloser) error { - return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), arch.Filename(), arch.Extension(), writeCloser) + return internal.DownloadFile(internal.NewFolderReader(sd.oplogsFolder), + arch.DBNodeSpecificFileName(sd.GetNodeSpecificDownloader()), arch.Extension(), writeCloser) } // ListOplogArchives fetches all oplog archives existed in storage. @@ -138,7 +147,10 @@ func (sd *StorageDownloader) ListOplogArchives() ([]models.Archive, error) { archives := make([]models.Archive, 0, len(objects)) for _, key := range objects { archName := key.GetName() - arch, err := models.ArchFromFilename(archName) + if !isOplogForSpecificNode(archName, sd.dbNode) { + continue + } + arch, err := models.ArchFromFilename(archName, sd.dbNode) if err != nil { return nil, fmt.Errorf("can not convert retrieve timestamps since oplog archive Ext '%s': %w", archName, err) } @@ -156,7 +168,10 @@ func (sd *StorageDownloader) LastKnownArchiveTS() (models.Timestamp, error) { } for _, key := range keys { filename := key.GetName() - arch, err := models.ArchFromFilename(filename) + if !isOplogForSpecificNode(filename, sd.dbNode) { + continue + } + arch, err := models.ArchFromFilename(filename, sd.dbNode) if err != nil { return models.Timestamp{}, fmt.Errorf("can not build archive since filename '%s': %w", filename, err) } @@ -210,6 +225,7 @@ type StorageUploader struct { kubeClient controllerruntime.Client snapshotName string snapshotNamespace string + dbNode string } // NewStorageUploader builds mongodb uploader. @@ -227,6 +243,13 @@ func (su *StorageUploader) SetSnapshot(name, namespace string) { su.snapshotNamespace = namespace } +func (su *StorageUploader) SetDBNode(node string) { + su.dbNode = node +} +func (su *StorageUploader) GetDBNode() string { + return su.dbNode +} + func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) error { var snapshot storageapi.Snapshot err := su.kubeClient.Get(context.TODO(), controllerruntime.ObjectKey{ @@ -236,6 +259,8 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro if err != nil { return err } + compName := "wal" + compName = compName + "-" + su.GetDBNode() _, err = kmc.PatchStatus( context.TODO(), @@ -245,17 +270,17 @@ func (su *StorageUploader) updateSnapshot(firstTS, lastTS models.Timestamp) erro in := obj.(*storageapi.Snapshot) if len(in.Status.Components) == 0 { in.Status.Components = make(map[string]storageapi.Component) - + } + if _, ok := in.Status.Components[compName]; !ok { walSegments := make([]storageapi.WalSegment, 1) walSegments[0].Start = &metav1.Time{Time: time.Unix(int64(firstTS.ToBsonTS().T), 0)} - in.Status.Components["wal"] = storageapi.Component{ + in.Status.Components[compName] = storageapi.Component{ WalSegments: walSegments, } } - - component := in.Status.Components["wal"] + component := in.Status.Components[compName] component.WalSegments[0].End = &metav1.Time{Time: time.Unix(int64(lastTS.ToBsonTS().T), 0)} - in.Status.Components["wal"] = component + in.Status.Components[compName] = component return in }, @@ -281,9 +306,9 @@ func (su *StorageUploader) UploadOplogArchive(ctx context.Context, stream io.Rea if err != nil { return err } - + fileName := arch.DBNodeSpecificFileName(su.dbNode) // providing io.ReaderAt+io.ReadSeeker to s3 upload enables buffer pool usage - return su.Upload(ctx, arch.Filename(), bytes.NewReader(su.buf.Bytes())) + return su.Upload(ctx, fileName, bytes.NewReader(su.buf.Bytes())) } // UploadGap uploads mark indicating archiving gap. @@ -297,7 +322,8 @@ func (su *StorageUploader) UploadGapArchive(archErr error, firstTS, lastTS model return fmt.Errorf("can not build archive: %w", err) } - if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), arch.Filename()); err != nil { + if err := su.PushStreamToDestination(context.Background(), strings.NewReader(archErr.Error()), + arch.DBNodeSpecificFileName(su.dbNode)); err != nil { return fmt.Errorf("error while uploading stream: %w", err) } return nil @@ -367,3 +393,7 @@ func (sp *StoragePurger) DeleteOplogArchives(archives []models.Archive) error { tracelog.DebugLogger.Printf("Oplog keys will be deleted: %+v\n", oplogKeys) return sp.oplogsFolder.DeleteObjects(oplogKeys) } + +func isOplogForSpecificNode(fileName, node string) bool { + return strings.Contains(fileName, node) +} diff --git a/internal/databases/mongo/models/archive.go b/internal/databases/mongo/models/archive.go index f0c24195c..62aa1e21b 100644 --- a/internal/databases/mongo/models/archive.go +++ b/internal/databases/mongo/models/archive.go @@ -2,9 +2,8 @@ package models import ( "fmt" - "regexp" - "github.com/wal-g/wal-g/utility" + "regexp" ) // Archive path constants. @@ -15,9 +14,9 @@ const ( ArchiveTypeGap = "gap" ) -var ( - ArchRegexp = regexp.MustCompile(`^(oplog|gap)_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`) -) +//var ( +// ArchRegexp = regexp.MustCompile(`^(oplog|gap)_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`) +//) // Archive defines oplog archive representation. type Archive struct { @@ -49,6 +48,9 @@ func (a Archive) In(ts Timestamp) bool { func (a Archive) Filename() string { return fmt.Sprintf("%s_%v%s%v.%s", a.Type, a.Start, ArchNameTSDelimiter, a.End, a.Ext) } +func (a Archive) DBNodeSpecificFileName(node string) string { + return fmt.Sprintf("%s_%s_%v%s%v.%s", a.Type, node, a.Start, ArchNameTSDelimiter, a.End, a.Ext) +} // Extension returns extension of archive file name. func (a Archive) Extension() string { @@ -57,7 +59,10 @@ func (a Archive) Extension() string { // ArchFromFilename builds Arch from given path. // TODO: support empty extension -func ArchFromFilename(path string) (Archive, error) { +func ArchFromFilename(path string, node string) (Archive, error) { + format := fmt.Sprintf(`^(oplog|gap)_%v_(?P[0-9]+\.[0-9]+)_(?P[0-9]+\.[0-9]+)\.(?P[^$]+)$`, node) + ArchRegexp := regexp.MustCompile(format) + res := ArchRegexp.FindAllStringSubmatch(path, -1) if len(res) != 1 { return Archive{}, fmt.Errorf("can not parse oplog path: %s", path) diff --git a/internal/databases/mongo/models/archive_test.go b/internal/databases/mongo/models/archive_test.go index efb21b1ad..f01d7c8fa 100644 --- a/internal/databases/mongo/models/archive_test.go +++ b/internal/databases/mongo/models/archive_test.go @@ -280,7 +280,7 @@ func TestArchFromFilename(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := ArchFromFilename(tt.args.path) + got, err := ArchFromFilename(tt.args.path, "shard0") if (err != nil) != tt.wantErr { t.Errorf("ArchFromFilename() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/internal/databases/mongo/oplog/applier.go b/internal/databases/mongo/oplog/applier.go index 2b140f8aa..b23675b09 100644 --- a/internal/databases/mongo/oplog/applier.go +++ b/internal/databases/mongo/oplog/applier.go @@ -3,17 +3,16 @@ package oplog import ( "context" "fmt" - "io" - "strings" - "github.com/mongodb/mongo-tools-common/db" "github.com/mongodb/mongo-tools-common/txn" "github.com/mongodb/mongo-tools-common/util" "github.com/wal-g/tracelog" "github.com/wal-g/wal-g/internal/databases/mongo/client" "github.com/wal-g/wal-g/internal/databases/mongo/models" + "github.com/wal-g/wal-g/internal/databases/mongo/shake" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" + "io" ) type TypeAssertionError struct { @@ -78,11 +77,15 @@ type DBApplier struct { preserveUUID bool applyIgnoreErrorCodes map[string][]int32 until models.Timestamp + dbNode string + filterList shake.OplogFilterChain } // NewDBApplier builds DBApplier with given args. -func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32) *DBApplier { - return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, applyIgnoreErrorCodes: ignoreErrCodes} +func NewDBApplier(m client.MongoDriver, preserveUUID bool, ignoreErrCodes map[string][]int32, + node string, filterList shake.OplogFilterChain) *DBApplier { + return &DBApplier{db: m, txnBuffer: txn.NewBuffer(), preserveUUID: preserveUUID, + applyIgnoreErrorCodes: ignoreErrCodes, dbNode: node, filterList: filterList} } func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { @@ -97,9 +100,10 @@ func (ap *DBApplier) Apply(ctx context.Context, opr models.Oplog) error { return nil } - if err := ap.shouldSkip(op.Operation, op.Namespace); err != nil { - tracelog.DebugLogger.Printf("skipping op %+v due to: %+v", op, err) - return nil + if ap.dbNode != "configsvr" { + if ap.filterList.IterateFilter(&op) { + return nil + } } meta, err := txn.NewMeta(op) @@ -134,26 +138,15 @@ func (ap *DBApplier) Close(ctx context.Context) error { return nil } -func (ap *DBApplier) shouldSkip(op, ns string) error { - if op == "n" { - return fmt.Errorf("noop op") - } - - // sharded clusters are not supported yet - if strings.HasPrefix(ns, "config.") { - return fmt.Errorf("config database op") - } - - return nil -} - // shouldIgnore checks if error should be ignored func (ap *DBApplier) shouldIgnore(op string, err error) bool { ce, ok := err.(mongo.CommandError) if !ok { return false } - + if mongo.IsDuplicateKeyError(err) { + return true + } ignoreErrorCodes, ok := ap.applyIgnoreErrorCodes[op] if !ok { return false @@ -265,8 +258,8 @@ func indexSpecFromCommitIndexBuilds(op db.Oplog) (string, []client.IndexDocument if !ok { return "", nil, NewTypeAssertionError("bson.D", fmt.Sprintf("indexes[%d]", i), elemE.Value) } - for i := range elements { - elemE = elements[i] + for j := range elements { + elemE = elements[j] if elemE.Key == "key" { if indexSpecs[i].Key, ok = elemE.Value.(bson.D); !ok { return "", nil, NewTypeAssertionError("bson.D", "key", elemE.Value) diff --git a/internal/databases/mongo/shake/filter.go b/internal/databases/mongo/shake/filter.go new file mode 100644 index 000000000..a1ccbf512 --- /dev/null +++ b/internal/databases/mongo/shake/filter.go @@ -0,0 +1,95 @@ +package shake + +import ( + "github.com/mongodb/mongo-tools-common/db" + "github.com/wal-g/tracelog" + "reflect" + "strings" +) + +// OplogFilter: AutologousFilter, NoopFilter +type OplogFilter interface { + Filter(log *db.Oplog) bool +} +type OplogFilterChain []OplogFilter + +func (chain OplogFilterChain) IterateFilter(log *db.Oplog) bool { + for _, filter := range chain { + if filter.Filter(log) { + tracelog.DebugLogger.Printf("%v filter oplog[%v]", reflect.TypeOf(filter), log) + return true + } + } + return false +} + +type AutologousFilter struct { +} + +func (filter *AutologousFilter) Filter(log *db.Oplog) bool { + // Filter out unnecessary commands + if operation, found := ExtraCommandName(log.Object); found { + if IsNeedFilterCommand(operation) { + return true + } + } + return filter.FilterNs(log.Namespace) +} + +// NsShouldBeIgnore for namespaces should be filtered. +// key: ns, value: true means prefix, false means contain +var NsShouldBeIgnore = map[string]bool{ + "admin.": true, + "local.": true, + "config.": true, + "system.views": false, +} + +// NsShouldNotBeIgnore has a higher priority than NsShouldBeIgnore +// key: ns, value: true means prefix, false means contain +var NsShouldNotBeIgnore = map[string]bool{ + "admin.$cmd": true, + "admin.system.users": false, + "admin.system.roles": false, +} + +func (filter *AutologousFilter) FilterNs(namespace string) bool { + // for namespace. we filter noop operation and collection name + // that are admin, local, config + + // v2.4.13, don't filter admin.$cmd which may include transaction + // we don't filter admin.system.users and admin.system.roles to retrieve roles and users + for key, val := range NsShouldNotBeIgnore { + if val && strings.HasPrefix(namespace, key) { + return false + } + if !val && strings.Contains(namespace, key) { + return false + } + } + for key, val := range NsShouldBeIgnore { + if val && strings.HasPrefix(namespace, key) { + return true + } + if !val && strings.Contains(namespace, key) { + return true + } + } + return false +} + +type NoopFilter struct { +} + +func (filter *NoopFilter) Filter(log *db.Oplog) bool { + return log.Operation == "n" +} + +//type DDLFilter struct { +//} +// +//func (filter *DDLFilter) Filter(log *db.Oplog) bool { +// //operation, _ := ExtraCommandName(log.Object) +// //return log.Operation == "c" && operation != "applyOps" && operation != "create" || strings.HasSuffix(log.Namespace, "system.indexes") +// return false +//} diff --git a/internal/databases/mongo/shake/oplog.go b/internal/databases/mongo/shake/oplog.go new file mode 100644 index 000000000..9deaf8dcf --- /dev/null +++ b/internal/databases/mongo/shake/oplog.go @@ -0,0 +1,53 @@ +package shake + +import ( + "go.mongodb.org/mongo-driver/bson" + "strings" +) + +const ( + PrimaryKey = "_id" +) + +type CommandOperation struct { + concernSyncData bool + runOnAdmin bool // some commands like `renameCollection` need run on admin database + needFilter bool // should be ignored in shake +} + +var opsMap = map[string]*CommandOperation{ + "create": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "createIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "collMod": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropDatabase": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "drop": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "deleteIndex": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "deleteIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropIndex": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "dropIndexes": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "renameCollection": {concernSyncData: false, runOnAdmin: true, needFilter: false}, + "convertToCapped": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "emptycapped": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "applyOps": {concernSyncData: true, runOnAdmin: false, needFilter: false}, + "startIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: true}, + "commitIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: false}, + "abortIndexBuild": {concernSyncData: false, runOnAdmin: false, needFilter: true}, +} + +func ExtraCommandName(o bson.D) (string, bool) { + // command name must be at the first position + if len(o) > 0 { + if _, exist := opsMap[o[0].Key]; exist { + return o[0].Key, true + } + } + + return "", false +} + +func IsNeedFilterCommand(operation string) bool { + if op, ok := opsMap[strings.TrimSpace(operation)]; ok { + return op.needFilter + } + return false +} diff --git a/internal/databases/mongo/stages/fetcher.go b/internal/databases/mongo/stages/fetcher.go index 0c63450b7..b8c9df699 100644 --- a/internal/databases/mongo/stages/fetcher.go +++ b/internal/databases/mongo/stages/fetcher.go @@ -142,11 +142,12 @@ func (cb *CloserBuffer) Close() error { type StorageFetcher struct { downloader archive.Downloader path archive.Sequence + dbNode string } // NewStorageFetcher builds StorageFetcher instance -func NewStorageFetcher(downloader archive.Downloader, path archive.Sequence) *StorageFetcher { - return &StorageFetcher{downloader: downloader, path: path} +func NewStorageFetcher(downloader archive.Downloader, path archive.Sequence, node string) *StorageFetcher { + return &StorageFetcher{downloader: downloader, path: path, dbNode: node} } // FetchBetween returns channel of oplog records, channel is filled in background. @@ -168,12 +169,13 @@ func (sf *StorageFetcher) FetchBetween(ctx context.Context, firstFound := false for _, arch := range path { - tracelog.DebugLogger.Printf("Fetching archive %s", arch.Filename()) + tracelog.DebugLogger.Printf("Fetching archive %s", + arch.DBNodeSpecificFileName(sf.dbNode)) err := sf.downloader.DownloadOplogArchive(arch, buf) if err != nil { - errc <- fmt.Errorf("failed to download archive %s: %w", arch.Filename(), err) - return + errc <- fmt.Errorf("failed to download archive %s: %w", + arch.DBNodeSpecificFileName(sf.dbNode), err) } for { diff --git a/internal/databases/mysql/binlog_fetch_handler.go b/internal/databases/mysql/binlog_fetch_handler.go index 1f1635571..ac546beb1 100644 --- a/internal/databases/mysql/binlog_fetch_handler.go +++ b/internal/databases/mysql/binlog_fetch_handler.go @@ -48,11 +48,11 @@ func HandleBinlogFetch(folder storage.Folder, backupName string, untilTS string, tracelog.ErrorLogger.FatalOnError(err) var startTS, endTS, endBinlogTS time.Time if skipStartTime { - startTS, endTS, endBinlogTS, err = getEndTimestamps(folder, untilTS, untilBinlogLastModifiedTS) + startTS, endTS, endBinlogTS, err = getEndTimestamps(untilTS, untilBinlogLastModifiedTS) } else { startTS, endTS, endBinlogTS, err = getTimestamps(folder, backupName, untilTS, untilBinlogLastModifiedTS) - tracelog.ErrorLogger.FatalOnError(err) } + tracelog.ErrorLogger.FatalOnError(err) handler := newIndexHandler(dstDir) diff --git a/internal/databases/mysql/binlog_replay_handler.go b/internal/databases/mysql/binlog_replay_handler.go index 5c8f47d69..35c305ce3 100644 --- a/internal/databases/mysql/binlog_replay_handler.go +++ b/internal/databases/mysql/binlog_replay_handler.go @@ -113,7 +113,7 @@ func getTimestamps(folder storage.Folder, backupName, untilTS, untilBinlogLastMo return startTS, endTS, endBinlogTS, nil } -func getEndTimestamps(folder storage.Folder, untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) { +func getEndTimestamps(untilTS, untilBinlogLastModifiedTS string) (time.Time, time.Time, time.Time, error) { endTS, err := utility.ParseUntilTS(untilTS) if err != nil { return time.Time{}, time.Time{}, time.Time{}, err diff --git a/main/mongo/Dockerfile b/main/mongo/Dockerfile new file mode 100644 index 000000000..8e5854d6a --- /dev/null +++ b/main/mongo/Dockerfile @@ -0,0 +1,13 @@ +FROM debian:bookworm + +RUN set -x \ + && apt-get update \ + && apt-get install -y --no-install-recommends apt-transport-https ca-certificates curl; + +#RUN curl -LO https://fastdl.mongodb.org/tools/db/mongodb-database-tools-ubuntu2004-x86_64-100.5.4.tgz \ +# && tar -zxvf mongodb-database-tools-ubuntu2004-x86_64-100.5.4.tgz \ +# && cp -r ./mongodb-database-tools-ubuntu2004-x86_64-100.5.4/bin/* /bin/ ; + +COPY wal-g /bin/wal-g + +ENTRYPOINT [ "/bin/wal-g" ] \ No newline at end of file