Skip to content

Commit

Permalink
Make heartbeat configurable for AMQP connections (#460)
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdanap authored Jun 19, 2018
1 parent 7da96f5 commit 8584a65
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 35 deletions.
46 changes: 35 additions & 11 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,14 +620,26 @@ func (i *CLI) buildAMQPJobQueueAndCanceller() (*AMQPJobQueue, *AMQPCanceller, er
}
cfg.RootCAs.AppendCertsFromPEM(cert)
}
amqpConn, err = amqp.DialTLS(i.Config.AmqpURI, cfg)
amqpConn, err = amqp.DialConfig(i.Config.AmqpURI,
amqp.Config{
Heartbeat: i.Config.AmqpHeartbeat,
Locale: "en_US",
TLSClientConfig: cfg,
})
} else if i.Config.AmqpInsecure {
amqpConn, err = amqp.DialTLS(
amqpConn, err = amqp.DialConfig(
i.Config.AmqpURI,
&tls.Config{InsecureSkipVerify: true},
)
amqp.Config{
Heartbeat: i.Config.AmqpHeartbeat,
Locale: "en_US",
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
} else {
amqpConn, err = amqp.Dial(i.Config.AmqpURI)
amqpConn, err = amqp.DialConfig(i.Config.AmqpURI,
amqp.Config{
Heartbeat: i.Config.AmqpHeartbeat,
Locale: "en_US",
})
}
if err != nil {
i.logger.WithField("err", err).Error("couldn't connect to AMQP")
Expand Down Expand Up @@ -718,14 +730,26 @@ func (i *CLI) buildAMQPLogsQueue() error {
}
cfg.RootCAs.AppendCertsFromPEM(cert)
}
amqpConn, err = amqp.DialTLS(i.Config.LogsAmqpURI, cfg)
amqpConn, err = amqp.DialConfig(i.Config.AmqpURI,
amqp.Config{
Heartbeat: i.Config.AmqpHeartbeat,
Locale: "en_US",
TLSClientConfig: cfg,
})
} else if i.Config.AmqpInsecure {
amqpConn, err = amqp.DialTLS(
i.Config.LogsAmqpURI,
&tls.Config{InsecureSkipVerify: true},
)
amqpConn, err = amqp.DialConfig(
i.Config.AmqpURI,
amqp.Config{
Heartbeat: i.Config.AmqpHeartbeat,
Locale: "en_US",
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
})
} else {
amqpConn, err = amqp.Dial(i.Config.LogsAmqpURI)
amqpConn, err = amqp.DialConfig(i.Config.AmqpURI,
amqp.Config{
Heartbeat: i.Config.AmqpHeartbeat,
Locale: "en_US",
})
}
if err != nil {
i.logger.WithField("err", err).Error("couldn't connect to the logs AMQP server")
Expand Down
53 changes: 29 additions & 24 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ var (
Value: defaultQueueType,
Usage: `The name of the queue type to use ("amqp", "http", or "file")`,
}),
NewConfigDef("AmqpHeartbeat", &cli.DurationFlag{
Value: 10 * time.Second,
Usage: "The heartbeat timeout value defines after what time the peer TCP connection should be considered unreachable",
}),
NewConfigDef("AmqpURI", &cli.StringFlag{
Value: defaultAmqpURI,
Usage: `The URI to the AMQP server to connect to (only valid for "amqp" queue type)`,
Expand Down Expand Up @@ -322,30 +326,31 @@ func NewConfigDef(fieldName string, flag cli.Flag) *ConfigDef {

// Config contains all the configuration needed to run the worker.
type Config struct {
ProviderName string `config:"provider-name"`
QueueType string `config:"queue-type"`
AmqpURI string `config:"amqp-uri"`
AmqpInsecure bool `config:"amqp-insecure"`
AmqpTlsCert string `config:"amqp-tls-cert"`
AmqpTlsCertPath string `config:"amqp-tls-cert-path"`
BaseDir string `config:"base-dir"`
PoolSize int `config:"pool-size"`
BuildAPIURI string `config:"build-api-uri"`
QueueName string `config:"queue-name"`
LibratoEmail string `config:"librato-email"`
LibratoToken string `config:"librato-token"`
LibratoSource string `config:"librato-source"`
LogsAmqpURI string `config:"logs-amqp-uri"`
LogsAmqpTlsCert string `config:"logs-amqp-tls-cert"`
LogsAmqpTlsCertPath string `config:"logs-amqp-tls-cert-path"`
SentryDSN string `config:"sentry-dsn"`
Hostname string `config:"hostname"`
DefaultLanguage string `config:"default-language"`
DefaultDist string `config:"default-dist"`
DefaultGroup string `config:"default-group"`
DefaultOS string `config:"default-os"`
JobBoardURL string `config:"job-board-url"`
TravisSite string `config:"travis-site"`
ProviderName string `config:"provider-name"`
QueueType string `config:"queue-type"`
AmqpURI string `config:"amqp-uri"`
AmqpInsecure bool `config:"amqp-insecure"`
AmqpTlsCert string `config:"amqp-tls-cert"`
AmqpTlsCertPath string `config:"amqp-tls-cert-path"`
AmqpHeartbeat time.Duration `config:"amqp-heartbeat"`
BaseDir string `config:"base-dir"`
PoolSize int `config:"pool-size"`
BuildAPIURI string `config:"build-api-uri"`
QueueName string `config:"queue-name"`
LibratoEmail string `config:"librato-email"`
LibratoToken string `config:"librato-token"`
LibratoSource string `config:"librato-source"`
LogsAmqpURI string `config:"logs-amqp-uri"`
LogsAmqpTlsCert string `config:"logs-amqp-tls-cert"`
LogsAmqpTlsCertPath string `config:"logs-amqp-tls-cert-path"`
SentryDSN string `config:"sentry-dsn"`
Hostname string `config:"hostname"`
DefaultLanguage string `config:"default-language"`
DefaultDist string `config:"default-dist"`
DefaultGroup string `config:"default-group"`
DefaultOS string `config:"default-os"`
JobBoardURL string `config:"job-board-url"`
TravisSite string `config:"travis-site"`

StateUpdatePoolSize int `config:"state-update-pool-size"`
LogPoolSize int `config:"log-pool-size"`
Expand Down

0 comments on commit 8584a65

Please sign in to comment.