Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Execute() wait for initialization of status gRPC server before use #835

Merged
merged 2 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions plugin/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,42 @@ func (p *ExecutorPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBrok
return &ExecutorClient{client: types.NewExecutorClient(c), broker: broker}, nil
}

type Broker interface {
NextId() uint32
AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server)
}

// Here is the gRPC client that GRPCClient talks to.
type ExecutorClient struct {
// This is the real implementation
client types.ExecutorClient
broker *plugin.GRPCBroker
broker Broker
}

func (m *ExecutorClient) Execute(args *types.ExecuteRequest, cb StatusHelper) (*types.ExecuteResponse, error) {
// This is where the magic conversion to Proto happens
statusHelperServer := &GRPCStatusHelperServer{Impl: cb}

initChan := make(chan bool, 1)
var s *grpc.Server
serverFunc := func(opts []grpc.ServerOption) *grpc.Server {
s = grpc.NewServer(opts...)
types.RegisterStatusHelperServer(s, statusHelperServer)
initChan <- true

return s
}

brokerID := m.broker.NextId()
go m.broker.AcceptAndServe(brokerID, serverFunc)
go func() {
m.broker.AcceptAndServe(brokerID, serverFunc)
// AcceptAndServe might terminate without calling serverFunc
// To prevent eternal blocking, send 'init done' signal
initChan <- true
}()

// Wait for s to be initialized in the goroutine
<-initChan

args.StatusServer = brokerID
r, err := m.client.Execute(context.Background(), args)
Expand Down
47 changes: 47 additions & 0 deletions plugin/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package plugin

import (
"context"
"testing"

"github.com/stretchr/testify/assert"

"github.com/distribworks/dkron/v3/plugin/types"
dktypes "github.com/distribworks/dkron/v3/plugin/types"
grpc "google.golang.org/grpc"
)

type MockedExecutor struct{}

func (m *MockedExecutor) Execute(ctx context.Context, in *types.ExecuteRequest, opts ...grpc.CallOption) (*types.ExecuteResponse, error) {
resp := &dktypes.ExecuteResponse{}
return resp, nil
}

type MockedStatusHelper struct{}

func (m MockedStatusHelper) Update([]byte, bool) (int64, error) {
return 0, nil
}

type MockedBroker struct{}

func (m *MockedBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
}

func (m *MockedBroker) NextId() uint32 {
return 0
}

func TestExecuteDoesNotPanicIfGRPCIsNotInitializedOnTime(t *testing.T) {
var brokerMock MockedBroker
var execMock MockedExecutor
execClient := ExecutorClient{
client: &execMock,
broker: &brokerMock,
}

var requestStub dktypes.ExecuteRequest
var statusHelperMock MockedStatusHelper
assert.NotPanics(t, func() { execClient.Execute(&requestStub, statusHelperMock) })
}