Skip to content

Commit

Permalink
Merge pull request #169 from camunda-community-hub/invoke-connector
Browse files Browse the repository at this point in the history
fix: Invoke multiple connector jobs of the same type
  • Loading branch information
saig0 authored Apr 4, 2023
2 parents 0f32a72 + 80bfed5 commit ff10920
Showing 1 changed file with 152 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,26 +1,55 @@
package org.camunda.community.zeebe.play.rest

import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import io.camunda.connector.impl.outbound.OutboundConnectorConfiguration
import io.camunda.connector.runtime.util.ConnectorHelper
import io.camunda.connector.runtime.util.outbound.ConnectorJobHandler
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.model.bpmn.Bpmn
import io.camunda.zeebe.model.bpmn.instance.FlowElement
import io.camunda.zeebe.model.bpmn.instance.zeebe.ZeebeTaskHeaders
import io.zeebe.zeeqs.data.entity.ElementInstance
import io.zeebe.zeeqs.data.entity.Job
import io.zeebe.zeeqs.data.entity.JobState
import io.zeebe.zeeqs.data.entity.Process
import io.zeebe.zeeqs.data.repository.ElementInstanceRepository
import io.zeebe.zeeqs.data.repository.JobRepository
import io.zeebe.zeeqs.data.repository.ProcessRepository
import io.zeebe.zeeqs.data.service.VariableService
import org.camunda.community.zeebe.play.connectors.ConnectorService
import org.camunda.community.zeebe.play.connectors.ConnectorsSecretProvider
import org.springframework.data.repository.findByIdOrNull
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestMethod
import org.springframework.web.bind.annotation.RestController
import java.time.Duration
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

@RestController
@RequestMapping("/rest/connectors")
class ConnectorsResource(
private val connectorService: ConnectorService,
private val connectorsSecretProvider: ConnectorsSecretProvider,
private val zeebeClient: ZeebeClient
private val zeebeClient: ZeebeClient,
private val jobRepository: JobRepository,
private val processRepository: ProcessRepository,
private val elementInstanceRepository: ElementInstanceRepository,
private val variableService: VariableService
) {

companion object {
private val objectMapper = ObjectMapper()
}

private val executor = Executors.newSingleThreadScheduledExecutor()

private val keysOfPendingJobs = CopyOnWriteArrayList<Long>()

@RequestMapping(method = [RequestMethod.GET])
fun getAvailableConnectors(): ConnectorsDto {
return ConnectorsDto(
Expand All @@ -39,16 +68,55 @@ class ConnectorsResource(
val connector = ConnectorHelper.instantiateConnector(connectorConfig.connectorClass)
val jobHandler = ConnectorJobHandler(connector, connectorsSecretProvider)

findConnectorJob(connectorConfig, jobKey)
?.let { jobHandler.handle(zeebeClient, it) }
jobRepository.findByIdOrNull(jobKey)
?.takeIf { it.state == JobState.ACTIVATABLE && !keysOfPendingJobs.contains(jobKey) }
?.let { job ->
FakeActivatedJob(
job = job,
process = processRepository.findByIdOrNull(job.processDefinitionKey),
elementInstance = elementInstanceRepository.findByIdOrNull(job.elementInstanceKey),
variables = getJobVariables(job, connectorConfig)
)
}
?.let {
// block the invocation of this job for the next 10 seconds
keysOfPendingJobs.add(jobKey)

jobHandler.handle(zeebeClient, it)

executor.schedule({
keysOfPendingJobs.remove(jobKey)
}, 10, TimeUnit.SECONDS)
}
?: throw RuntimeException("No job found with key '$jobKey'.")
}

private fun getJobVariables(
job: Job,
connectorConfig: OutboundConnectorConfiguration
): String {
val allVariables = variableService.getVariables(
elementInstanceKey = job.elementInstanceKey,
localOnly = false,
shadowing = true
)
val filteredVariables =
allVariables.filter { connectorConfig.inputVariables.contains(it.name) }

return filteredVariables.joinToString(
separator = ",",
prefix = "{",
postfix = "}"
) { "\"${it.name}\": ${it.value}" }
}

private fun findConnectorJob(
connectorConfig: OutboundConnectorConfiguration,
jobKey: Long,
attempt: Int = 1
): ActivatedJob? {
// doesn't work well for multi-instance (i.e. more than one active job)
// blocked by https://github.com/camunda/zeebe/issues/5073
val job = zeebeClient
.newActivateJobsCommand()
.jobType(connectorConfig.type)
Expand Down Expand Up @@ -77,4 +145,85 @@ class ConnectorsResource(
val name: String,
val type: String
)

data class FakeActivatedJob(
private val job: Job,
private val process: Process?,
private val elementInstance: ElementInstance?,
private val variables: String
) : ActivatedJob {
override fun getKey(): Long {
return job.key
}

override fun getType(): String {
return job.jobType
}

override fun getProcessInstanceKey(): Long {
return job.processInstanceKey
}

override fun getBpmnProcessId(): String {
return process?.bpmnProcessId ?: "?"
}

override fun getProcessDefinitionVersion(): Int {
return process?.version ?: -1
}

override fun getProcessDefinitionKey(): Long {
return job.processDefinitionKey
}

override fun getElementId(): String {
return elementInstance?.elementId ?: "?"
}

override fun getElementInstanceKey(): Long {
return job.elementInstanceKey
}

override fun getCustomHeaders(): Map<String, String> {
return process?.let {
val bpmn = Bpmn.readModelFromStream(it.bpmnXML.byteInputStream())
val element: FlowElement = bpmn.getModelElementById(elementId)

element
.getSingleExtensionElement(ZeebeTaskHeaders::class.java)
?.headers
?.associate { it.key to it.value }
?: emptyMap()
} ?: emptyMap()
}

override fun getWorker(): String {
return job.worker ?: ""
}

override fun getRetries(): Int {
return job.retries ?: -1
}

override fun getDeadline(): Long {
return -1
}

override fun getVariables(): String {
return variables
}

override fun getVariablesAsMap(): Map<String, Any> {
val typeRef = object : TypeReference<Map<String, Any>>() {}
return objectMapper.readValue(variables, typeRef)
}

override fun <T : Any?> getVariablesAsType(variableType: Class<T>?): T {
TODO("Not yet implemented")
}

override fun toJson(): String {
return objectMapper.writeValueAsString(this)
}
}
}

0 comments on commit ff10920

Please sign in to comment.