-
Notifications
You must be signed in to change notification settings - Fork 789
/
message.go
116 lines (99 loc) · 2.24 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package kafka
import (
"time"
)
// Message is a data structure representing kafka messages.
type Message struct {
// Topic indicates which topic this message was consumed from via Reader.
//
// When being used with Writer, this can be used to configure the topic if
// not already specified on the writer itself.
Topic string
// Partition is read-only and MUST NOT be set when writing messages
Partition int
Offset int64
HighWaterMark int64
Key []byte
Value []byte
Headers []Header
// If not set at the creation, Time will be automatically set when
// writing the message.
Time time.Time
}
func (msg Message) message(cw *crc32Writer) message {
m := message{
MagicByte: 1,
Key: msg.Key,
Value: msg.Value,
Timestamp: timestamp(msg.Time),
}
if cw != nil {
m.CRC = m.crc32(cw)
}
return m
}
const timestampSize = 8
func (msg *Message) size() int32 {
return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
}
type message struct {
CRC int32
MagicByte int8
Attributes int8
Timestamp int64
Key []byte
Value []byte
}
func (m message) crc32(cw *crc32Writer) int32 {
cw.crc32 = 0
cw.writeInt8(m.MagicByte)
cw.writeInt8(m.Attributes)
if m.MagicByte != 0 {
cw.writeInt64(m.Timestamp)
}
cw.writeBytes(m.Key)
cw.writeBytes(m.Value)
return int32(cw.crc32)
}
func (m message) size() int32 {
size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
if m.MagicByte != 0 {
size += timestampSize
}
return size
}
func (m message) writeTo(wb *writeBuffer) {
wb.writeInt32(m.CRC)
wb.writeInt8(m.MagicByte)
wb.writeInt8(m.Attributes)
if m.MagicByte != 0 {
wb.writeInt64(m.Timestamp)
}
wb.writeBytes(m.Key)
wb.writeBytes(m.Value)
}
type messageSetItem struct {
Offset int64
MessageSize int32
Message message
}
func (m messageSetItem) size() int32 {
return 8 + 4 + m.Message.size()
}
func (m messageSetItem) writeTo(wb *writeBuffer) {
wb.writeInt64(m.Offset)
wb.writeInt32(m.MessageSize)
m.Message.writeTo(wb)
}
type messageSet []messageSetItem
func (s messageSet) size() (size int32) {
for _, m := range s {
size += m.size()
}
return
}
func (s messageSet) writeTo(wb *writeBuffer) {
for _, m := range s {
m.writeTo(wb)
}
}