-
Notifications
You must be signed in to change notification settings - Fork 54
/
main.go
177 lines (148 loc) · 5.99 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/DataDog/kafka-kit/v4/internal/registry/server"
"github.com/DataDog/kafka-kit/v4/kafkaadmin"
"github.com/DataDog/kafka-kit/v4/kafkazk"
"github.com/Masterminds/semver"
"github.com/jamiealquiza/envy"
"gopkg.in/DataDog/dd-trace-go.v1/profiler"
)
// This can be set with -ldflags "-X main.version=x.x.x"
var version = "0.0.0"
func main() {
serverConfig := server.Config{}
zkConfig := kafkazk.Config{}
adminConfig := kafkaadmin.Config{}
securityProtocols := make([]string, 0, len(kafkaadmin.SecurityProtocolSet))
for k := range kafkaadmin.SecurityProtocolSet {
securityProtocols = append(securityProtocols, k)
}
saslMechanims := make([]string, 0, len(kafkaadmin.SASLMechanismSet))
for k := range kafkaadmin.SASLMechanismSet {
saslMechanims = append(saslMechanims, k)
}
v := flag.Bool("version", false, "version")
profiling := flag.Bool("enable-profiling", false, "Enable Datadog continuous profiling")
locking := flag.Bool("enable-locking", false, "Enable distributed locking for write operations")
flag.StringVar(&serverConfig.HTTPListen, "http-listen", "localhost:8080", "Server HTTP listen address")
flag.StringVar(&serverConfig.GRPCListen, "grpc-listen", "localhost:8090", "Server gRPC listen address")
flag.IntVar(&serverConfig.ReadReqRate, "read-rate-limit", 5, "Read request rate limit (reqs/s)")
flag.IntVar(&serverConfig.WriteReqRate, "write-rate-limit", 1, "Write request rate limit (reqs/s)")
flag.StringVar(&serverConfig.ZKTagsPrefix, "zk-tags-prefix", "registry", "Tags storage ZooKeeper prefix")
flag.StringVar(&zkConfig.Connect, "zk-addr", "localhost:2181", "ZooKeeper connect string")
flag.StringVar(&zkConfig.Prefix, "zk-prefix", "", "ZooKeeper prefix (if Kafka is configured with a chroot path prefix)")
flag.StringVar(&adminConfig.BootstrapServers, "bootstrap-servers", "localhost", "Kafka bootstrap servers")
flag.StringVar(&adminConfig.SecurityProtocol, "kafka-security-protocol", "", fmt.Sprintf("Protocol used to communicate with brokers. Supported: %s", strings.Join(securityProtocols, ", ")))
flag.StringVar(&adminConfig.SSLCALocation, "kafka-ssl-ca-location", "", "CA certificate path (.pem/.crt) for verifying broker's identity. Needed for SSL and SASL_SSL protocols.")
flag.StringVar(&adminConfig.SASLMechanism, "kafka-sasl-mechanism", "", fmt.Sprintf("SASL mechanism to use for authentication. Supported: %s", strings.Join(saslMechanims, ", ")))
flag.StringVar(&adminConfig.SASLUsername, "kafka-sasl-username", "", "SASL username for use with the PLAIN and SASL-SCRAM-* mechanisms")
flag.StringVar(&adminConfig.SASLPassword, "kafka-sasl-password", "", "SASL password for use with the PLAIN and SASL-SCRAM-* mechanisms")
defaultRequestTimeout := flag.Int("default-request-timeout", 5000, "Default request API request timeout in milliseconds. API request deadlines are also automatically capped to 3x this value.")
flag.IntVar(&serverConfig.TagAllowedStalenessMinutes, "tag-allowed-staleness", 60, "Minutes before tags with no associated resource are deleted")
flag.IntVar(&serverConfig.TagCleanupFrequencyMinutes, "tag-cleanup-frequency", 20, "Minutes between runs of tag cleanup")
kafkaVersionString := flag.String("kafka-version", "v0.10.2", "Kafka release (Semantic Versioning)")
envy.Parse("REGISTRY")
flag.Parse()
serverConfig.DefaultRequestTimeout = time.Duration(*defaultRequestTimeout) * time.Millisecond
if *v {
fmt.Println(version)
os.Exit(0)
}
_, err := semver.NewVersion(*kafkaVersionString)
if err != nil {
fmt.Printf("Invalid SemVer: %s\n", *kafkaVersionString)
os.Exit(1)
}
// Start profiling if enabled.
if *profiling {
// Optional trace agent URL override.
traceAgentURL := "localhost:8126"
if e := os.Getenv("TRACE_AGENT_URL"); e != "" {
traceAgentURL = e
}
err = profiler.Start(
profiler.WithService("kafka-registry"),
profiler.WithAgentAddr(traceAgentURL),
profiler.WithProfileTypes(
profiler.CPUProfile,
profiler.HeapProfile,
),
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer profiler.Stop()
}
if adminConfig.SecurityProtocol != "" {
adminConfig.SecurityProtocol = strings.ToUpper(adminConfig.SecurityProtocol)
if _, validChoice := kafkaadmin.SecurityProtocolSet[adminConfig.SecurityProtocol]; !validChoice {
fmt.Printf("Invalid kafka security protocol. Supported protocols: %s\n", strings.Join(securityProtocols, ", "))
os.Exit(1)
}
}
if adminConfig.SASLMechanism != "" {
adminConfig.SASLMechanism = strings.ToUpper(adminConfig.SASLMechanism)
if _, validChoice := kafkaadmin.SASLMechanismSet[adminConfig.SASLMechanism]; !validChoice {
fmt.Printf("Invalid kafka SASL mechanism. Supported mechanisms: %s\n", strings.Join(saslMechanims, ", "))
os.Exit(1)
}
}
log.Println("Registry running")
ctx, cancel := context.WithCancel(context.Background())
ctx = context.WithValue(ctx, "reqID", "registry-server")
wg := &sync.WaitGroup{}
// Initialize Server.
srvr, err := server.NewServer(serverConfig)
if err != nil {
log.Fatal(err)
}
// Dial ZooKeeper.
if err := srvr.DialZK(ctx, wg, &zkConfig); err != nil {
log.Fatal(err)
}
// Enable locking.
if *locking {
if err := srvr.EnablingLocking(&zkConfig); err != nil {
log.Fatal(err)
}
}
// Init an admin Client.
if err := srvr.InitKafkaAdmin(ctx, wg, adminConfig); err != nil {
log.Fatal(err)
}
// Init a kafka consumer. Needed for offset translations.
if err := srvr.InitKafkaConsumer(ctx, wg, adminConfig); err != nil {
log.Fatal(err)
}
// Start the gRPC listener.
if err := srvr.RunRPC(ctx, wg); err != nil {
log.Fatal(err)
}
// Start the HTTP listener.
if err := srvr.RunHTTP(ctx, wg); err != nil {
log.Fatal(err)
}
// Start the tag cleanup background thread
if err := srvr.RunTagCleanup(ctx, wg, serverConfig); err != nil {
log.Fatal(err)
}
// Graceful shutdown on SIGINT.
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
cancel()
}()
wg.Wait()
}