Skip to content

Commit

Permalink
Fix Google Batch hang when internal error during scheduling (#5567)
Browse files Browse the repository at this point in the history

Signed-off-by: jorgee <jorge.ejarque@seqera.io>
Signed-off-by: Jorge Ejarque <jorgee@users.noreply.github.com>
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Ben Sherman <bentshermann@gmail.com>
  • Loading branch information
3 people committed Dec 16, 2024
1 parent 9910cec commit 79374c5
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ import nextflow.trace.TraceRecord
@CompileStatic
class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private static Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/
private static final Pattern EXIT_CODE_REGEX = ~/exit code 500(\d\d)/

private static final Pattern BATCH_ERROR_REGEX = ~/Batch Error: code/

private GoogleBatchExecutor executor

Expand Down Expand Up @@ -98,6 +100,11 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

private volatile long timestamp

/**
* A flag to indicate that the job has failed without launching any tasks
*/
private volatile boolean noTaskJobfailure

GoogleBatchTaskHandler(TaskRun task, GoogleBatchExecutor executor) {
super(task)
this.client = executor.getClient()
Expand Down Expand Up @@ -445,9 +452,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
*/
protected String getTaskState() {
final tasks = client.listTasks(jobId)
if( !tasks.iterator().hasNext() )
return 'PENDING'

if( !tasks.iterator().hasNext() ) {
// if there are no tasks checks the job status
return checkJobStatus()
}
final now = System.currentTimeMillis()
final delta = now - timestamp;
if( !taskState || delta >= 1_000) {
Expand All @@ -468,6 +476,21 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return taskState
}

protected String checkJobStatus() {
final jobStatus = client.getJobStatus(jobId)
final newState = jobStatus?.state as String
if (newState) {
taskState = newState
timestamp = System.currentTimeMillis()
if (newState == "FAILED") {
noTaskJobfailure = true
}
return taskState
} else {
return "PENDING"
}
}

static private final List<String> RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED']

static private final List<String> COMPLETED = ['SUCCEEDED', 'FAILED']
Expand Down Expand Up @@ -510,13 +533,14 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {

protected Throwable getJobError() {
try {
final status = client.getTaskStatus(jobId, taskId)
final eventsCount = status.getStatusEventsCount()
final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null
final events = noTaskJobfailure
? client.getJobStatus(jobId).getStatusEventsList()
: client.getTaskStatus(jobId, taskId).getStatusEventsList()
final lastEvent = events?.get(events.size() - 1)
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}"

final error = lastEvent?.description
if( error && EXIT_CODE_REGEX.matcher(error).find() ) {
if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) ) {
return new ProcessException(error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.cloud.batch.v1.BatchServiceClient
import com.google.cloud.batch.v1.BatchServiceSettings
import com.google.cloud.batch.v1.Job
import com.google.cloud.batch.v1.JobName
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.LocationName
import com.google.cloud.batch.v1.Task
import com.google.cloud.batch.v1.TaskGroupName
Expand Down Expand Up @@ -122,6 +123,10 @@ class BatchClient {
return describeTask(jobId, taskId).getStatus()
}

JobStatus getJobStatus(String jobId) {
return describeJob(jobId).getStatus()
}

String getTaskState(String jobId, String taskId) {
final status = getTaskStatus(jobId, taskId)
return status ? status.getState().toString() : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package nextflow.cloud.google.batch

import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task

import java.nio.file.Path

import com.google.cloud.batch.v1.GCS
Expand Down Expand Up @@ -581,4 +584,39 @@ class GoogleBatchTaskHandlerTest extends Specification {
and:
0 * client.deleteJob('job1') >> null
}

JobStatus makeJobStatus(JobStatus.State state, String desc = null) {
final builder = JobStatus.newBuilder().setState(state)
if( desc ) {
builder.addStatusEvents(
StatusEvent.newBuilder()
.setDescription(desc)
)
}
builder.build()
}

def 'should check job status when no tasks in job '() {

given:
def jobId = 'job-id'
def taskId = 'task-id'
def client = Mock(BatchClient)
def task = Mock(TaskRun) {
lazyName() >> 'foo (1)'
}
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task))
final message = 'Job failed when Batch tries to schedule it: Batch Error: code - CODE_MACHINE_TYPE_NOT_FOUND'
when:
client.listTasks(jobId) >>> [new LinkedList<Task>(), new LinkedList<Task>()]
client.getJobStatus(jobId) >>> [
null,
makeJobStatus(JobStatus.State.FAILED, 'Scheduling Failed'),
makeJobStatus(JobStatus.State.FAILED, message)
]
then:
handler.getTaskState() == "PENDING"
handler.getTaskState() == "FAILED"
handler.getJobError().message == message
}
}

0 comments on commit 79374c5

Please sign in to comment.