-
Notifications
You must be signed in to change notification settings - Fork 1
/
compact.go
84 lines (72 loc) · 2.45 KB
/
compact.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package dbengine
import (
"time"
log "github.com/sirupsen/logrus"
)
// memtableCompactService - handles compacting memtable into sstable files into disk (a.k.a "minor compaction")
type memtableCompactService struct {
db *Database
queue []MemTable
c chan MemTable
}
func newMemtableCompactService(db *Database) *memtableCompactService {
return &memtableCompactService{
db: db,
queue: make([]MemTable, 0),
c: make(chan MemTable),
}
}
// enqueue - add the input memtable to the compaction queue for async compaction at a later time
func (mcs *memtableCompactService) enqueue(mem MemTable) {
mcs.c <- mem
mcs.queue = append(mcs.queue, mem)
}
// getQueuedTables - get all the memtables that are in the compaction queue but not yet compacted
// those tables should continue to serve get request before being serialized to disk.
func (mcs *memtableCompactService) getQueuedTables() []MemTable {
return mcs.queue
}
// start - start the service to handle compaction tasks
func (mcs *memtableCompactService) start() {
for {
select {
case mem := <-mcs.c:
if err := mcs.serializeMemtable(mem); err != nil {
log.Fatalf("Failed to serialize memtable to sstable - Error: %s", err.Error())
}
mcs.queue = mcs.queue[1:]
// delete the WAL since the wal isn't needed anymore for a memtable that's serialized already
if err := mem.Wal().Delete(); err != nil {
log.Warnf("Failed to delete WAL file %s after serializing its corresponding memtable - Error: %s", mem.Wal().File().Name(), err.Error())
}
log.Infof("Deleted WAL file %s", mem.Wal().File().Name())
}
}
}
// serializeMemtable - serialize the input memtable into a sstable file
func (mcs *memtableCompactService) serializeMemtable(mem MemTable) error {
writer, err := NewBasicSSTableWriter(mcs.db.sstableDir, mcs.db.setting.SStableDatablockSizeByte)
if err != nil {
return err
}
if err = writer.Dump(mem); err != nil {
return err
}
log.Infof("Serialized memtable to sstable at %s", writer.File())
return nil
}
// sstableCompactService - compacting smaller sstable files into larger file (a.k.a "major compaction")
type sstableCompactService struct {
db *Database
interval time.Duration
lastRun time.Time
}
func newSSTableCompactService(db *Database) *sstableCompactService {
return &sstableCompactService{
db: db,
interval: 5 * time.Second,
lastRun: time.Now(),
}
}
// TODO: (p0) implement sstable compaction
func (scs *sstableCompactService) start() {}