Skip to content

Commit

Permalink
use the TSManager instance to manage the ts info (#123)
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG authored Sep 3, 2024
1 parent 364e07b commit 89d91a5
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 74 deletions.
20 changes: 10 additions & 10 deletions core/reader/collection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/retry"

"github.com/zilliztech/milvus-cdc/core/api"
Expand Down Expand Up @@ -85,15 +84,16 @@ func NewCollectionReader(id string,
errChan: make(chan error),
retryOptions: util.GetRetryOptions(readerConfig.Retry),
}
for _, collectionPositions := range seekPosition {
for channel, msgPosition := range collectionPositions {
pchannel := channel
if IsVirtualChannel(pchannel) {
pchannel = funcutil.ToPhysicalChannel(pchannel)
}
GetTSManager().CollectTS(pchannel, msgPosition.GetTimestamp())
}
}
// for _, collectionPositions := range seekPosition {
// for channel, msgPosition := range collectionPositions {
// pchannel := channel
// if IsVirtualChannel(pchannel) {
// pchannel = funcutil.ToPhysicalChannel(pchannel)
// }
// // TODO how to use the target channel
// GetTSManager().CollectTS(pchannel, msgPosition.GetTimestamp())
// }
// }
return reader, nil
}

Expand Down
55 changes: 26 additions & 29 deletions core/reader/replicate_channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ func (r *replicateChannelManager) startReadChannel(sourceInfo *model.SourceColle
}
return nil, nil
}
if sourceInfo.SeekPosition != nil {
GetTSManager().CollectTS(channelHandler.targetPChannel, sourceInfo.SeekPosition.GetTimestamp())
}
if channelHandler.targetPChannel != targetInfo.PChannel {
log.Info("diff target pchannel", zap.String("target_channel", targetInfo.PChannel), zap.String("handler_channel", channelHandler.targetPChannel))
r.forwardChannel(targetInfo)
Expand Down Expand Up @@ -743,11 +746,8 @@ type replicateChannelHandler struct {
isDroppedCollection func(int64) bool
isDroppedPartition func(int64) bool

retryOptions []retry.Option
ttLock deadlock.Mutex
ttPeriod time.Duration
lastSendTTTime time.Time
ttRateLog *log.RateLog
retryOptions []retry.Option
ttRateLog *log.RateLog

addCollectionLock *deadlock.RWMutex
addCollectionCnt *int
Expand Down Expand Up @@ -1169,7 +1169,7 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
log.Warn("begin timestamp is 0", zap.Uint64("end_ts", pack.EndTs), zap.Any("hasValidMsg", hasValidMsg))
}
}
GetTSManager().CollectTS(r.pChannelName, beginTS)
GetTSManager().CollectTS(r.targetPChannel, beginTS)
r.addCollectionLock.RUnlock()

if r.msgPackCallback != nil {
Expand Down Expand Up @@ -1397,26 +1397,23 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
position.ChannelName = pChannel
}

maxTS, _ := GetTSManager().GetMaxTS(r.pChannelName)
maxTS, _ := GetTSManager().GetMaxTS(r.targetPChannel)
resetTS := resetMsgPackTimestamp(newPack, maxTS)
if resetTS {
GetTSManager().CollectTS(r.pChannelName, newPack.EndTs)
GetTSManager().CollectTS(r.targetPChannel, newPack.EndTs)
}

r.ttLock.Lock()
defer r.ttLock.Unlock()
GetTSManager().LockTargetChannel(r.targetPChannel)
defer GetTSManager().UnLockTargetChannel(r.targetPChannel)

if !needTsMsg && len(newPack.Msgs) == 0 && time.Since(r.lastSendTTTime) < r.ttPeriod {
if !needTsMsg && len(newPack.Msgs) == 0 && !GetTSManager().UnsafeShouldSendTSMsg(r.targetPChannel) {
return api.EmptyMsgPack
}

if GetTSManager().GetLastMsgTS(r.pChannelName) >= newPack.BeginTs {
maxTS, _ = GetTSManager().GetMaxTS(r.pChannelName)
resetTS2 := resetMsgPackTimestamp(newPack, maxTS)
if resetTS2 {
GetTSManager().CollectTS(r.pChannelName, newPack.EndTs)
}
}
GetTSManager().UnsafeUpdatePackTS(r.targetPChannel, newPack.BeginTs, func(newTS uint64) (uint64, bool) {
reset := resetMsgPackTimestamp(newPack, maxTS)
return newPack.EndTs, reset
})

resetLastTs := needTsMsg
needTsMsg = needTsMsg || len(newPack.Msgs) == 0
Expand All @@ -1442,11 +1439,8 @@ func (r *replicateChannelHandler) handlePack(forward bool, pack *msgstream.MsgPa
TimeTickMsg: timeTickResult,
}
newPack.Msgs = append(newPack.Msgs, timeTickMsg)
GetTSManager().SetLastMsgTS(r.pChannelName, generateTS)
r.lastSendTTTime = time.Now()
if resetLastTs {
r.lastSendTTTime = r.lastSendTTTime.Add(-r.ttPeriod)
}

GetTSManager().UnsafeUpdateTSInfo(r.targetPChannel, generateTS, resetLastTs)
r.ttRateLog.Debug("time tick msg", zap.String("channel", r.targetPChannel), zap.Uint64("max_ts", maxTS))
return api.GetReplicateMsg(sourceCollectionName, sourceCollectionID, newPack)
}
Expand All @@ -1459,7 +1453,7 @@ func resetMsgPackTimestamp(pack *msgstream.MsgPack, newTimestamp uint64) bool {
deltas := make([]uint64, len(pack.Msgs))
lastTS := uint64(0)
for i, msg := range pack.Msgs {
if lastTS == msg.BeginTs() {
if i != 0 && lastTS == msg.BeginTs() {
deltas[i] = deltas[i-1]
} else {
deltas[i] = uint64(i) + 1
Expand Down Expand Up @@ -1600,14 +1594,17 @@ func initReplicateChannelHandler(ctx context.Context,
forwardPackChan: make(chan *api.ReplicateMsg, opts.MessageBufferSize),
generatePackChan: make(chan *api.ReplicateMsg, 30),
retryOptions: opts.RetryOptions,
ttPeriod: time.Duration(opts.TTInterval) * time.Millisecond,
sourceSeekPosition: sourceInfo.SeekPosition,
ttRateLog: log.NewRateLog(0.01, log.L()),
}
channelHandler.ttLock.Lock()
channelHandler.lastSendTTTime = time.Now().Add(-channelHandler.ttPeriod)
channelHandler.ttLock.Unlock()
var cts uint64 = math.MaxUint64
if sourceInfo.SeekPosition != nil {
cts = sourceInfo.SeekPosition.GetTimestamp()
}
GetTSManager().InitTSInfo(channelHandler.targetPChannel,
time.Duration(opts.TTInterval)*time.Millisecond,
cts,
)
go channelHandler.AddCollection(sourceInfo, targetInfo)
GetTSManager().CollectTS(channelHandler.pChannelName, math.MaxUint64)
return channelHandler, nil
}
5 changes: 4 additions & 1 deletion core/reader/replicate_channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package reader
import (
"context"
"errors"
"math"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -594,7 +595,7 @@ func TestReplicateChannelHandler(t *testing.T) {
handler.isDroppedPartition = func(i int64) bool {
return false
}
handler.lastSendTTTime = time.Now().Add(-handler.ttPeriod)
GetTSManager().InitTSInfo(handler.targetPChannel, 100*time.Millisecond, math.MaxUint64)

err = handler.AddPartitionInfo(&pb.CollectionInfo{
ID: 1,
Expand Down Expand Up @@ -626,6 +627,8 @@ func TestReplicateChannelHandler(t *testing.T) {
assert.Len(t, pack.StartPositions, 1)
assert.Len(t, pack.EndPositions, 1)
assert.Len(t, pack.Msgs, 1)
_, ok := pack.Msgs[0].(*msgstream.TimeTickMsg)
assert.True(t, ok, pack.Msgs[0])
}
{
// insert msg
Expand Down
Loading

0 comments on commit 89d91a5

Please sign in to comment.