Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro committed Mar 30, 2020
1 parent de84829 commit 95822ee
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 23 deletions.
22 changes: 11 additions & 11 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,18 @@ func (k *mqSink) calPartition(row *model.RowChangedEvent) int32 {
log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err))
}

if len(row.IndieMarkCol) > 0 {
// distribute partition by rowid or unique column value
value := row.Columns[row.IndieMarkCol].Value
b, err := json.Marshal(value)
if err != nil {
log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err))
}
_, err = hash.Write(b)
if err != nil {
log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err))
}
//if len(row.IndieMarkCol) > 0 {
// distribute partition by rowid or unique column value
value := row.Columns[row.IndieMarkCol].Value
b, err := json.Marshal(value)
if err != nil {
log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err))
}
_, err = hash.Write(b)
if err != nil {
log.Fatal("calculate hash of message key failed, please report a bug", zap.Error(err))
}

return int32(hash.Sum32() % uint32(k.partitionNum))
}

Expand Down
24 changes: 12 additions & 12 deletions cdc/sink/mqProducer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,18 @@ func (k *kafkaSaramaProducer) SendMessage(ctx context.Context, key []byte, value
callback(err)
}
}
cb(nil)
//select {
//case <-ctx.Done():
// return 0, errors.Trace(ctx.Err())
//case k.asyncClient.Input() <- &sarama.ProducerMessage{
// Topic: k.topic,
// Key: sarama.ByteEncoder(key),
// Value: sarama.ByteEncoder(value),
// Partition: partition,
// Metadata: cb,
//}:
//}
//cb(nil)
select {
case <-ctx.Done():
return 0, errors.Trace(ctx.Err())
case k.asyncClient.Input() <- &sarama.ProducerMessage{
Topic: k.topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Partition: partition,
Metadata: cb,
}:
}
return index, nil
}

Expand Down

0 comments on commit 95822ee

Please sign in to comment.