diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index 51562bd15bb..77a88fa5edf 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -78,6 +78,8 @@ type options struct { EncryptionKey x.SensitiveByteSlice // BadgerCompressionlevel is the compression level to use while writing to badger. BadgerCompressionLevel int + BlockCacheSize int64 + IndexCacheSize int64 } type state struct { diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 0bc01af84fd..27b11ac4b84 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -142,13 +142,22 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB } } - opt := badger.DefaultOptions(dir).WithSyncWrites(false). - WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */). - WithLogger(nil).WithBlockCacheSize(1 << 20). - WithEncryptionKey(r.opt.EncryptionKey) - + opt := badger.DefaultOptions(dir). + WithSyncWrites(false). + WithTableLoadingMode(bo.MemoryMap). + WithValueThreshold(1 << 10 /* 1 KB */). + WithLogger(nil). + WithEncryptionKey(r.opt.EncryptionKey). + WithBlockCacheSize(r.opt.BlockCacheSize). + WithIndexCacheSize(r.opt.IndexCacheSize) + + opt.Compression = bo.None + opt.ZSTDCompressionLevel = 0 // Overwrite badger options based on the options provided by the user. - r.setBadgerOptions(&opt, compression) + if compression { + opt.Compression = bo.ZSTD + opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel + } db, err := badger.OpenManaged(opt) x.Check(err) @@ -173,20 +182,6 @@ func (r *reducer) createTmpBadger() *badger.DB { return db } -func (r *reducer) setBadgerOptions(opt *badger.Options, compression bool) { - if !compression { - opt.Compression = bo.None - opt.ZSTDCompressionLevel = 0 - return - } - // Set the compression level. - opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel - if r.state.opt.BadgerCompressionLevel < 1 { - x.Fatalf("Invalid compression level: %d. It should be greater than zero", - r.state.opt.BadgerCompressionLevel) - } -} - type mapIterator struct { fd *os.File reader *bufio.Reader diff --git a/dgraph/cmd/bulk/run.go b/dgraph/cmd/bulk/run.go index 4b01bb79bd8..d208090e546 100644 --- a/dgraph/cmd/bulk/run.go +++ b/dgraph/cmd/bulk/run.go @@ -106,38 +106,42 @@ func init() { // Options around how to set up Badger. flag.Int("badger.compression_level", 1, "The compression level for Badger. A higher value uses more resources.") + flag.Int64("badger.cache_mb", 0, "Total size of cache (in MB) per shard in reducer.") + flag.String("badger.cache_percentage", "0,100", + "Cache percentages summing up to 100 for various caches"+ + " (FORMAT: BlockCacheSize, IndexCacheSize).") // Encryption and Vault options enc.RegisterFlags(flag) } func run() { - var err error opt := options{ - DataFiles: Bulk.Conf.GetString("files"), - DataFormat: Bulk.Conf.GetString("format"), - SchemaFile: Bulk.Conf.GetString("schema"), - GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"), - Encrypted: Bulk.Conf.GetBool("encrypted"), - OutDir: Bulk.Conf.GetString("out"), - ReplaceOutDir: Bulk.Conf.GetBool("replace_out"), - TmpDir: Bulk.Conf.GetString("tmp"), - NumGoroutines: Bulk.Conf.GetInt("num_go_routines"), - MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")), - PartitionBufSize: int64(Bulk.Conf.GetInt("partition_mb")), - SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"), - CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"), - NumReducers: Bulk.Conf.GetInt("reducers"), - Version: Bulk.Conf.GetBool("version"), - StoreXids: Bulk.Conf.GetBool("store_xids"), - ZeroAddr: Bulk.Conf.GetString("zero"), - HttpAddr: Bulk.Conf.GetString("http"), - IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"), - MapShards: Bulk.Conf.GetInt("map_shards"), - ReduceShards: Bulk.Conf.GetInt("reduce_shards"), - CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"), - NewUids: Bulk.Conf.GetBool("new_uids"), - ClientDir: Bulk.Conf.GetString("xidmap"), + DataFiles: Bulk.Conf.GetString("files"), + DataFormat: Bulk.Conf.GetString("format"), + SchemaFile: Bulk.Conf.GetString("schema"), + GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"), + Encrypted: Bulk.Conf.GetBool("encrypted"), + OutDir: Bulk.Conf.GetString("out"), + ReplaceOutDir: Bulk.Conf.GetBool("replace_out"), + TmpDir: Bulk.Conf.GetString("tmp"), + NumGoroutines: Bulk.Conf.GetInt("num_go_routines"), + MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")), + PartitionBufSize: int64(Bulk.Conf.GetInt("partition_mb")), + SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"), + CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"), + NumReducers: Bulk.Conf.GetInt("reducers"), + Version: Bulk.Conf.GetBool("version"), + StoreXids: Bulk.Conf.GetBool("store_xids"), + ZeroAddr: Bulk.Conf.GetString("zero"), + HttpAddr: Bulk.Conf.GetString("http"), + IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"), + MapShards: Bulk.Conf.GetInt("map_shards"), + ReduceShards: Bulk.Conf.GetInt("reduce_shards"), + CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"), + NewUids: Bulk.Conf.GetBool("new_uids"), + ClientDir: Bulk.Conf.GetString("xidmap"), + // Badger options BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"), } @@ -145,6 +149,19 @@ func run() { if opt.Version { os.Exit(0) } + if opt.BadgerCompressionLevel < 0 { + fmt.Printf("Invalid compression level: %d. It should be non-negative", + opt.BadgerCompressionLevel) + } + + totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb")) + x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative") + cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2) + x.Check(err) + totalCache <<= 20 // Convert to MB. + opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100 + opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100 + if opt.EncryptionKey, err = enc.ReadKey(Bulk.Conf); err != nil { fmt.Printf("unable to read key %v", err) return