Skip to content

Commit

Permalink
fix(bulkLoader): Use flags for cache (#6322)
Browse files Browse the repository at this point in the history
Bulk loader uses caches in compression and this PR adds flags to make it
configurable.

Bulk loader was setting compressionLevel but not the compression option.
As a result of this, badger wasn't compressing any data. This PR fixes
this setting compression if compressionLevel is greater than 0.
  • Loading branch information
Ibrahim Jarif authored Sep 16, 2020
1 parent 8c518d4 commit 99341dc
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 45 deletions.
2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type options struct {
EncryptionKey x.SensitiveByteSlice
// BadgerCompressionlevel is the compression level to use while writing to badger.
BadgerCompressionLevel int
BlockCacheSize int64
IndexCacheSize int64
}

type state struct {
Expand Down
35 changes: 15 additions & 20 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,22 @@ func (r *reducer) createBadgerInternal(dir string, compression bool) *badger.DB
}
}

opt := badger.DefaultOptions(dir).WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).WithBlockCacheSize(1 << 20).
WithEncryptionKey(r.opt.EncryptionKey)

opt := badger.DefaultOptions(dir).
WithSyncWrites(false).
WithTableLoadingMode(bo.MemoryMap).
WithValueThreshold(1 << 10 /* 1 KB */).
WithLogger(nil).
WithEncryptionKey(r.opt.EncryptionKey).
WithBlockCacheSize(r.opt.BlockCacheSize).
WithIndexCacheSize(r.opt.IndexCacheSize)

opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
// Overwrite badger options based on the options provided by the user.
r.setBadgerOptions(&opt, compression)
if compression {
opt.Compression = bo.ZSTD
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
}

db, err := badger.OpenManaged(opt)
x.Check(err)
Expand All @@ -173,20 +182,6 @@ func (r *reducer) createTmpBadger() *badger.DB {
return db
}

func (r *reducer) setBadgerOptions(opt *badger.Options, compression bool) {
if !compression {
opt.Compression = bo.None
opt.ZSTDCompressionLevel = 0
return
}
// Set the compression level.
opt.ZSTDCompressionLevel = r.state.opt.BadgerCompressionLevel
if r.state.opt.BadgerCompressionLevel < 1 {
x.Fatalf("Invalid compression level: %d. It should be greater than zero",
r.state.opt.BadgerCompressionLevel)
}
}

type mapIterator struct {
fd *os.File
reader *bufio.Reader
Expand Down
67 changes: 42 additions & 25 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,45 +106,62 @@ func init() {
// Options around how to set up Badger.
flag.Int("badger.compression_level", 1,
"The compression level for Badger. A higher value uses more resources.")
flag.Int64("badger.cache_mb", 0, "Total size of cache (in MB) per shard in reducer.")
flag.String("badger.cache_percentage", "0,100",
"Cache percentages summing up to 100 for various caches"+
" (FORMAT: BlockCacheSize, IndexCacheSize).")

// Encryption and Vault options
enc.RegisterFlags(flag)
}

func run() {
var err error
opt := options{
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
PartitionBufSize: int64(Bulk.Conf.GetInt("partition_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Version: Bulk.Conf.GetBool("version"),
StoreXids: Bulk.Conf.GetBool("store_xids"),
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),
ClientDir: Bulk.Conf.GetString("xidmap"),
DataFiles: Bulk.Conf.GetString("files"),
DataFormat: Bulk.Conf.GetString("format"),
SchemaFile: Bulk.Conf.GetString("schema"),
GqlSchemaFile: Bulk.Conf.GetString("graphql_schema"),
Encrypted: Bulk.Conf.GetBool("encrypted"),
OutDir: Bulk.Conf.GetString("out"),
ReplaceOutDir: Bulk.Conf.GetBool("replace_out"),
TmpDir: Bulk.Conf.GetString("tmp"),
NumGoroutines: Bulk.Conf.GetInt("num_go_routines"),
MapBufSize: uint64(Bulk.Conf.GetInt("mapoutput_mb")),
PartitionBufSize: int64(Bulk.Conf.GetInt("partition_mb")),
SkipMapPhase: Bulk.Conf.GetBool("skip_map_phase"),
CleanupTmp: Bulk.Conf.GetBool("cleanup_tmp"),
NumReducers: Bulk.Conf.GetInt("reducers"),
Version: Bulk.Conf.GetBool("version"),
StoreXids: Bulk.Conf.GetBool("store_xids"),
ZeroAddr: Bulk.Conf.GetString("zero"),
HttpAddr: Bulk.Conf.GetString("http"),
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
MapShards: Bulk.Conf.GetInt("map_shards"),
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),
NewUids: Bulk.Conf.GetBool("new_uids"),
ClientDir: Bulk.Conf.GetString("xidmap"),
// Badger options
BadgerCompressionLevel: Bulk.Conf.GetInt("badger.compression_level"),
}

x.PrintVersion()
if opt.Version {
os.Exit(0)
}
if opt.BadgerCompressionLevel < 0 {
fmt.Printf("Invalid compression level: %d. It should be non-negative",
opt.BadgerCompressionLevel)
}

totalCache := int64(Bulk.Conf.GetInt("badger.cache_mb"))
x.AssertTruef(totalCache >= 0, "ERROR: Cache size must be non-negative")
cachePercent, err := x.GetCachePercentages(Bulk.Conf.GetString("badger.cache_percentage"), 2)
x.Check(err)
totalCache <<= 20 // Convert to MB.
opt.BlockCacheSize = (cachePercent[0] * totalCache) / 100
opt.IndexCacheSize = (cachePercent[1] * totalCache) / 100

if opt.EncryptionKey, err = enc.ReadKey(Bulk.Conf); err != nil {
fmt.Printf("unable to read key %v", err)
return
Expand Down

0 comments on commit 99341dc

Please sign in to comment.