Skip to content

Commit

Permalink
fix: waiting for service pod
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin committed Nov 28, 2024
1 parent 9d85b35 commit 57db7a0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
7 changes: 6 additions & 1 deletion cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,13 @@ func main() {
getTestWorkflowServiceNotificationsStream := func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) {
execution, err := testWorkflowResultsRepository.Get(ctx, executionID)
if err != nil {
return nil, err
return nil, errors.Join(err, agent.ErrGetTestWorkflowExecution)
}

if execution.Result != nil && execution.Result.IsFinished() {
return nil, agent.ErrFinishedTestWorkflowExecution
}

notifications := executionWorker.Notifications(ctx, fmt.Sprintf("%s-%s-%d", execution.Id, serviceName, serviceIndex), executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
Expand Down
29 changes: 23 additions & 6 deletions pkg/agent/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import (

const testWorkflowNotificationsRetryCount = 10

var (
ErrGetTestWorkflowExecution = errors.New("can't get test workflow execution")
ErrFinishedTestWorkflowExecution = errors.New("test workflow execution is finished")
)

func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotification) cloud.TestWorkflowNotificationType {
if n.Result != nil {
return cloud.TestWorkflowNotificationType_WORKFLOW_STREAM_RESULT
Expand Down Expand Up @@ -236,17 +241,29 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
}

func (ag *Agent) executeWorkflowServiceNotificationsRequest(ctx context.Context, req *cloud.TestWorkflowServiceNotificationsRequest) error {
notificationsCh, err := ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex))
for i := 0; i < testWorkflowNotificationsRetryCount; i++ {
var (
notificationsCh <-chan testkube.TestWorkflowExecutionNotification
err error
)

for {
notificationsCh, err = ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex))
if errors.Is(err, ErrGetTestWorkflowExecution) || errors.Is(ErrFinishedTestWorkflowExecution) {

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Unit Tests

not enough arguments in call to errors.Is

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Unit Tests

not enough arguments in call to errors.Is

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Lint Go

not enough arguments in call to errors.Is

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Lint Go

not enough arguments in call to errors.Is

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Lint Go

not enough arguments in call to errors.Is

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Integration Tests

not enough arguments in call to errors.Is

Check failure on line 251 in pkg/agent/testworkflows.go

View workflow job for this annotation

GitHub Actions / Integration Tests

not enough arguments in call to errors.Is
break
}

if err != nil {
// We have a race condition here
// Cloud sometimes slow to insert execution or test
// Cloud sometimes slow to start service
// while WorkflowNotifications request from websockets comes in faster
// so we retry up to testWorkflowNotificationsRetryCount times.
time.Sleep(time.Second)
notificationsCh, err = ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex))
// so we retry up to wait till service pod is uo or execution is finished.
time.Sleep(100 * time.Millisecond)
continue
}

break
}

if err != nil {
message := fmt.Sprintf("cannot get service pod logs: %s", err.Error())
ag.testWorkflowServiceNotificationsResponseBuffer <- &cloud.TestWorkflowServiceNotificationsResponse{
Expand Down

0 comments on commit 57db7a0

Please sign in to comment.