Skip to content

Commit

Permalink
fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
hwjiangkai committed Dec 9, 2022
1 parent 9c505cc commit 8b132b7
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 263 deletions.
7 changes: 6 additions & 1 deletion internal/controller/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,11 +412,16 @@ func (ctrl *controller) processHeartbeat(ctx context.Context, req *ctrlpb.Segmen
Capacity: info.Capacity,
EventLogID: block.EventlogID,
Size: info.Size,
State: eventlog.SegmentState(info.State),
Number: info.EventNumber,
FirstEventBornTime: time.UnixMilli(info.FirstEventBornTime),
LastEventBornTime: time.UnixMilli(info.LastEventBornTime),
}
// block state transfer to segment state
if info.State == "archiving" {
seg.State = eventlog.StateFreezing
} else if info.State == "archived" {
seg.State = eventlog.StateFrozen
}
logArr = append(logArr, seg)
segments[block.EventlogID.Key()] = logArr
}
Expand Down
14 changes: 7 additions & 7 deletions internal/controller/eventbus/eventlog/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
type SegmentState string

const (
StateCreated = SegmentState("created")
StateWorking = SegmentState("working")
StateFrozen = SegmentState("frozen")
StatePreFrozen = SegmentState("prefrozen")
StateArchived = SegmentState("archived")
StateExpired = SegmentState("expired")
StateCreated = SegmentState("created")
StateWorking = SegmentState("working")
StateFrozen = SegmentState("frozen")
StateFreezing = SegmentState("freezing")
StateArchived = SegmentState("archived")
StateExpired = SegmentState("expired")
)

type Segment struct {
Expand Down Expand Up @@ -109,7 +109,7 @@ func (seg *Segment) isNeedUpdate(newSeg Segment) bool {
}

func (seg *Segment) isPreFull() bool {
return seg.State == StatePreFrozen
return seg.State == StateFreezing
}

func (seg *Segment) isFull() bool {
Expand Down
9 changes: 7 additions & 2 deletions internal/store/block/raft/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Appender interface {

Stop(ctx context.Context)
Bootstrap(ctx context.Context, blocks []Peer) error
Frozen(ctx context.Context)
Archived(ctx context.Context)
Delete(ctx context.Context)
Status() ClusterStatus
}
Expand Down Expand Up @@ -187,7 +187,12 @@ func (a *appender) Bootstrap(ctx context.Context, blocks []Peer) error {
return a.node.Bootstrap(peers)
}

func (a *appender) Frozen(ctx context.Context) {
func (a *appender) Archived(ctx context.Context) {
a.appendMu.Lock()
defer a.appendMu.Unlock()
if a.actx.Archived() {
return
}
if frag, err := a.raw.PrepareArchive(ctx, a.actx); err == nil {
data, _ := block.MarshalFragment(ctx, frag)
_ = a.node.Propose(ctx, data)
Expand Down
11 changes: 5 additions & 6 deletions internal/store/block/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,18 @@ type Raw interface {
Delete(context.Context) error
}

type SegmentState string
type State string

const (
StateWorking = SegmentState("working")
StateFrozen = SegmentState("frozen")
StatePreFrozen = SegmentState("prefrozen")
StateWorking = State("working")
StateArchiving = State("archiving")
StateArchived = State("archived")
)

type Statistics struct {
ID vanus.ID
Capacity uint64
Archived bool
State SegmentState
State State
EntryNum uint32
EntrySize uint64
// FirstEntryStime is the millisecond timestamp when the first Entry will be written to Block.
Expand Down
3 changes: 1 addition & 2 deletions internal/store/segment/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,12 @@ func (r *replica) Status() *metapb.SegmentHealthInfo {
Capacity: int64(stat.Capacity),
Size: int64(stat.EntrySize),
EventNumber: int32(stat.EntryNum),
IsFull: stat.Archived,
State: string(stat.State),
Leader: cs.Leader.Uint64(),
Term: cs.Term,
FirstEventBornTime: stat.FirstEntryStime,
}
if stat.Archived {
if stat.State == block.StateArchived {
info.LastEventBornTime = stat.LastEntryStime
}
return info
Expand Down
7 changes: 3 additions & 4 deletions internal/store/segment/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,14 @@ func (s *server) runHeartbeat(_ context.Context) error {
})
}

if segment.State != string(block.StatePreFrozen) {
if segment.State != "freezing" {
break
}

s.replicas.Range(func(key, value interface{}) bool {
b, _ := value.(replica)
if b.appender.Status().Leader.Equals(vanus.ID(segment.LeaderBlockId)) {
b.appender.Frozen(context.Background())
b.appender.Archived(context.Background())
return false
}
return true
Expand Down Expand Up @@ -716,11 +716,10 @@ func (s *server) onBlockArchived(stat block.Statistics) {
Capacity: int64(stat.Capacity),
Size: int64(stat.EntrySize),
EventNumber: int32(stat.EntryNum),
IsFull: stat.Archived,
State: string(stat.State),
FirstEventBornTime: stat.FirstEntryStime,
}
if stat.Archived {
if stat.State == block.StateArchived {
info.LastEventBornTime = stat.LastEntryStime
}

Expand Down
3 changes: 1 addition & 2 deletions internal/store/vsb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,10 @@ func (b *vsBlock) status() block.Statistics {
return b.stat(m, indexes, block.StateWorking)
}

func (b *vsBlock) stat(m meta, indexes []index.Index, state block.SegmentState) block.Statistics {
func (b *vsBlock) stat(m meta, indexes []index.Index, state block.State) block.Statistics {
s := block.Statistics{
ID: b.id,
Capacity: uint64(b.capacity),
Archived: m.archived,
State: state,
EntryNum: uint32(m.entryNum),
EntrySize: uint64(m.entryLength),
Expand Down
4 changes: 2 additions & 2 deletions internal/store/vsb/block_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (b *vsBlock) PrepareAppend(
full := actx.size(b.dataOffset) >= b.capacity
if full && b.lis != nil {
m, indexes := makeSnapshot(b.actx, b.indexes)
b.lis.OnArchived(b.stat(m, indexes, block.StatePreFrozen))
b.lis.OnArchived(b.stat(m, indexes, block.StateArchiving))
}

return seqs, frag, full, nil
Expand Down Expand Up @@ -203,7 +203,7 @@ func (b *vsBlock) CommitAppend(ctx context.Context, frags ...block.Fragment) (bo
}()

if b.lis != nil {
b.lis.OnArchived(b.stat(m, i, block.StateFrozen))
b.lis.OnArchived(b.stat(m, i, block.StateArchived))
}
}

Expand Down
11 changes: 0 additions & 11 deletions internal/store/vsb/block_append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func TestVSBlock_Append(t *testing.T) {
So(full, ShouldBeFalse)

stat := b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 0)
So(stat.EntrySize, ShouldEqual, 0)

Expand All @@ -116,7 +115,6 @@ func TestVSBlock_Append(t *testing.T) {
So(archived, ShouldBeFalse)

stat = b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 1)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0)

Expand All @@ -131,7 +129,6 @@ func TestVSBlock_Append(t *testing.T) {
So(full, ShouldBeTrue)

stat = b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 1)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0)

Expand All @@ -140,7 +137,6 @@ func TestVSBlock_Append(t *testing.T) {
So(archived, ShouldBeFalse)

stat = b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand All @@ -161,7 +157,6 @@ func TestVSBlock_Append(t *testing.T) {
So(full, ShouldBeFalse)

stat := b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 0)
So(stat.EntrySize, ShouldEqual, 0)

Expand All @@ -173,7 +168,6 @@ func TestVSBlock_Append(t *testing.T) {
So(full, ShouldBeTrue)

stat = b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 0)
So(stat.EntrySize, ShouldEqual, 0)

Expand All @@ -182,7 +176,6 @@ func TestVSBlock_Append(t *testing.T) {
So(archived, ShouldBeFalse)

stat = b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand All @@ -203,7 +196,6 @@ func TestVSBlock_Append(t *testing.T) {
So(full, ShouldBeTrue)

stat := b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 0)
So(stat.EntrySize, ShouldEqual, 0)

Expand All @@ -212,7 +204,6 @@ func TestVSBlock_Append(t *testing.T) {
So(archived, ShouldBeFalse)

stat = b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand Down Expand Up @@ -288,7 +279,6 @@ func TestVSBlock_Append(t *testing.T) {
So(frag1.StartOffset(), ShouldEqual, vsbtest.EndEntryOffset)

stat := b.status()
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 0)
So(stat.EntrySize, ShouldEqual, 0)

Expand All @@ -297,7 +287,6 @@ func TestVSBlock_Append(t *testing.T) {
So(archived, ShouldBeTrue)

stat = b.status()
So(stat.Archived, ShouldBeTrue)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand Down
3 changes: 0 additions & 3 deletions internal/store/vsb/block_open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func TestVSBlock_Open(t *testing.T) {

stat := b.status()
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
So(stat.Archived, ShouldBeTrue)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand Down Expand Up @@ -104,7 +103,6 @@ func TestVSBlock_Open(t *testing.T) {

stat := b.status()
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
So(stat.Archived, ShouldBeTrue)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand Down Expand Up @@ -141,7 +139,6 @@ func TestVSBlock_Open(t *testing.T) {

stat := b.status()
So(stat.Capacity, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)
So(stat.Archived, ShouldBeFalse)
So(stat.EntryNum, ShouldEqual, 2)
So(stat.EntrySize, ShouldEqual, vsbtest.EntrySize0+vsbtest.EntrySize1)

Expand Down
Loading

0 comments on commit 8b132b7

Please sign in to comment.