diff --git a/src/main/kotlin/org/camunda/community/zeebe/play/rest/ConnectorsResource.kt b/src/main/kotlin/org/camunda/community/zeebe/play/rest/ConnectorsResource.kt index 14cf270..29dc6e7 100644 --- a/src/main/kotlin/org/camunda/community/zeebe/play/rest/ConnectorsResource.kt +++ b/src/main/kotlin/org/camunda/community/zeebe/play/rest/ConnectorsResource.kt @@ -1,26 +1,55 @@ package org.camunda.community.zeebe.play.rest +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.ObjectMapper import io.camunda.connector.impl.outbound.OutboundConnectorConfiguration import io.camunda.connector.runtime.util.ConnectorHelper import io.camunda.connector.runtime.util.outbound.ConnectorJobHandler import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.api.response.ActivatedJob +import io.camunda.zeebe.model.bpmn.Bpmn +import io.camunda.zeebe.model.bpmn.instance.FlowElement +import io.camunda.zeebe.model.bpmn.instance.zeebe.ZeebeTaskHeaders +import io.zeebe.zeeqs.data.entity.ElementInstance +import io.zeebe.zeeqs.data.entity.Job +import io.zeebe.zeeqs.data.entity.JobState +import io.zeebe.zeeqs.data.entity.Process +import io.zeebe.zeeqs.data.repository.ElementInstanceRepository +import io.zeebe.zeeqs.data.repository.JobRepository +import io.zeebe.zeeqs.data.repository.ProcessRepository +import io.zeebe.zeeqs.data.service.VariableService import org.camunda.community.zeebe.play.connectors.ConnectorService import org.camunda.community.zeebe.play.connectors.ConnectorsSecretProvider +import org.springframework.data.repository.findByIdOrNull import org.springframework.web.bind.annotation.PathVariable import org.springframework.web.bind.annotation.RequestMapping import org.springframework.web.bind.annotation.RequestMethod import org.springframework.web.bind.annotation.RestController import java.time.Duration +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit @RestController @RequestMapping("/rest/connectors") class ConnectorsResource( private val connectorService: ConnectorService, private val connectorsSecretProvider: ConnectorsSecretProvider, - private val zeebeClient: ZeebeClient + private val zeebeClient: ZeebeClient, + private val jobRepository: JobRepository, + private val processRepository: ProcessRepository, + private val elementInstanceRepository: ElementInstanceRepository, + private val variableService: VariableService ) { + companion object { + private val objectMapper = ObjectMapper() + } + + private val executor = Executors.newSingleThreadScheduledExecutor() + + private val keysOfPendingJobs = CopyOnWriteArrayList() + @RequestMapping(method = [RequestMethod.GET]) fun getAvailableConnectors(): ConnectorsDto { return ConnectorsDto( @@ -39,16 +68,55 @@ class ConnectorsResource( val connector = ConnectorHelper.instantiateConnector(connectorConfig.connectorClass) val jobHandler = ConnectorJobHandler(connector, connectorsSecretProvider) - findConnectorJob(connectorConfig, jobKey) - ?.let { jobHandler.handle(zeebeClient, it) } + jobRepository.findByIdOrNull(jobKey) + ?.takeIf { it.state == JobState.ACTIVATABLE && !keysOfPendingJobs.contains(jobKey) } + ?.let { job -> + FakeActivatedJob( + job = job, + process = processRepository.findByIdOrNull(job.processDefinitionKey), + elementInstance = elementInstanceRepository.findByIdOrNull(job.elementInstanceKey), + variables = getJobVariables(job, connectorConfig) + ) + } + ?.let { + // block the invocation of this job for the next 10 seconds + keysOfPendingJobs.add(jobKey) + + jobHandler.handle(zeebeClient, it) + + executor.schedule({ + keysOfPendingJobs.remove(jobKey) + }, 10, TimeUnit.SECONDS) + } ?: throw RuntimeException("No job found with key '$jobKey'.") } + private fun getJobVariables( + job: Job, + connectorConfig: OutboundConnectorConfiguration + ): String { + val allVariables = variableService.getVariables( + elementInstanceKey = job.elementInstanceKey, + localOnly = false, + shadowing = true + ) + val filteredVariables = + allVariables.filter { connectorConfig.inputVariables.contains(it.name) } + + return filteredVariables.joinToString( + separator = ",", + prefix = "{", + postfix = "}" + ) { "\"${it.name}\": ${it.value}" } + } + private fun findConnectorJob( connectorConfig: OutboundConnectorConfiguration, jobKey: Long, attempt: Int = 1 ): ActivatedJob? { + // doesn't work well for multi-instance (i.e. more than one active job) + // blocked by https://github.com/camunda/zeebe/issues/5073 val job = zeebeClient .newActivateJobsCommand() .jobType(connectorConfig.type) @@ -77,4 +145,85 @@ class ConnectorsResource( val name: String, val type: String ) + + data class FakeActivatedJob( + private val job: Job, + private val process: Process?, + private val elementInstance: ElementInstance?, + private val variables: String + ) : ActivatedJob { + override fun getKey(): Long { + return job.key + } + + override fun getType(): String { + return job.jobType + } + + override fun getProcessInstanceKey(): Long { + return job.processInstanceKey + } + + override fun getBpmnProcessId(): String { + return process?.bpmnProcessId ?: "?" + } + + override fun getProcessDefinitionVersion(): Int { + return process?.version ?: -1 + } + + override fun getProcessDefinitionKey(): Long { + return job.processDefinitionKey + } + + override fun getElementId(): String { + return elementInstance?.elementId ?: "?" + } + + override fun getElementInstanceKey(): Long { + return job.elementInstanceKey + } + + override fun getCustomHeaders(): Map { + return process?.let { + val bpmn = Bpmn.readModelFromStream(it.bpmnXML.byteInputStream()) + val element: FlowElement = bpmn.getModelElementById(elementId) + + element + .getSingleExtensionElement(ZeebeTaskHeaders::class.java) + ?.headers + ?.associate { it.key to it.value } + ?: emptyMap() + } ?: emptyMap() + } + + override fun getWorker(): String { + return job.worker ?: "" + } + + override fun getRetries(): Int { + return job.retries ?: -1 + } + + override fun getDeadline(): Long { + return -1 + } + + override fun getVariables(): String { + return variables + } + + override fun getVariablesAsMap(): Map { + val typeRef = object : TypeReference>() {} + return objectMapper.readValue(variables, typeRef) + } + + override fun getVariablesAsType(variableType: Class?): T { + TODO("Not yet implemented") + } + + override fun toJson(): String { + return objectMapper.writeValueAsString(this) + } + } } \ No newline at end of file