diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index 9e7fc98729f..0ce184ce192 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -34,6 +34,8 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" ) +var pluginHealthCheckInterval = time.Second * 60 + // Configuration describes the options to customize the storage behavior. type Configuration struct { PluginBinary string `yaml:"binary" mapstructure:"binary"` @@ -42,6 +44,10 @@ type Configuration struct { RemoteServerAddr string `yaml:"server" mapstructure:"server"` RemoteTLS tlscfg.Options RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"` + + pluginHealthCheck *time.Ticker + pluginHealthCheckDone chan bool + pluginRPCClient plugin.ClientProtocol } // ClientPluginServices defines services plugin can expose and its capabilities @@ -59,13 +65,18 @@ type PluginBuilder interface { // Build instantiates a PluginServices func (c *Configuration) Build(logger *zap.Logger) (*ClientPluginServices, error) { if c.PluginBinary != "" { - return c.buildPlugin() + return c.buildPlugin(logger) } else { return c.buildRemote(logger) } } func (c *Configuration) Close() error { + if c.pluginHealthCheck != nil { + c.pluginHealthCheck.Stop() + c.pluginHealthCheckDone <- true + } + return c.RemoteTLS.Close() } @@ -103,7 +114,7 @@ func (c *Configuration) buildRemote(logger *zap.Logger) (*ClientPluginServices, }, nil } -func (c *Configuration) buildPlugin() (*ClientPluginServices, error) { +func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices, error) { // #nosec G204 cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile) @@ -154,6 +165,10 @@ func (c *Configuration) buildPlugin() (*ClientPluginServices, error) { raw, shared.StoragePluginIdentifier) } + if err := c.startPluginHealthCheck(rpcClient, logger); err != nil { + return nil, fmt.Errorf("initial plugin health check failed: %w", err) + } + return &ClientPluginServices{ PluginServices: shared.PluginServices{ Store: storagePlugin, @@ -162,3 +177,24 @@ func (c *Configuration) buildPlugin() (*ClientPluginServices, error) { Capabilities: capabilities, }, nil } + +func (c *Configuration) startPluginHealthCheck(rpcClient plugin.ClientProtocol, logger *zap.Logger) error { + c.pluginRPCClient = rpcClient + c.pluginHealthCheckDone = make(chan bool) + c.pluginHealthCheck = time.NewTicker(pluginHealthCheckInterval) + + go func() { + for { + select { + case <-c.pluginHealthCheckDone: + return + case <-c.pluginHealthCheck.C: + if err := c.pluginRPCClient.Ping(); err != nil { + logger.Fatal("plugin health check failed", zap.Error(err)) + } + } + } + }() + + return c.pluginRPCClient.Ping() +}