Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create process instance with message/timer start event #147

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,28 +1,172 @@
package org.camunda.community.zeebe.play.rest

import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.model.bpmn.Bpmn
import io.camunda.zeebe.model.bpmn.instance.MessageEventDefinition
import io.camunda.zeebe.model.bpmn.instance.Process
import io.camunda.zeebe.model.bpmn.instance.StartEvent
import io.camunda.zeebe.model.bpmn.instance.TimerEventDefinition
import io.zeebe.zeeqs.data.entity.TimerState
import io.zeebe.zeeqs.data.repository.MessageCorrelationRepository
import io.zeebe.zeeqs.data.repository.MessageSubscriptionRepository
import io.zeebe.zeeqs.data.repository.ProcessRepository
import io.zeebe.zeeqs.data.repository.TimerRepository
import org.camunda.community.zeebe.play.connectors.ConnectorService
import org.camunda.community.zeebe.play.services.ZeebeClockService
import org.springframework.data.repository.findByIdOrNull
import org.springframework.web.bind.annotation.*
import java.io.ByteArrayInputStream
import java.time.Duration
import java.time.Instant
import java.util.concurrent.Callable
import java.util.concurrent.Executors

private val RETRY_INTERVAL = Duration.ofMillis(100)

@RestController
@RequestMapping("/rest/processes")
class ProcessesResource(
private val zeebeClient: ZeebeClient,
private val connectorService: ConnectorService
private val connectorService: ConnectorService,
private val processRepository: ProcessRepository,
private val messageSubscriptionRepository: MessageSubscriptionRepository,
private val messageCorrelationRepository: MessageCorrelationRepository,
private val timerRepository: TimerRepository,
private val clockService: ZeebeClockService
) {

private val executor = Executors.newSingleThreadExecutor()

@RequestMapping(path = ["/{processKey}"], method = [RequestMethod.POST])
fun createInstance(
@PathVariable("processKey") processKey: Long,
@RequestBody variables: String
): Long {
// hack for the web-modeler to deal with not none start events
val process = processRepository.findByIdOrNull(processKey)
?.let { Bpmn.readModelFromStream(ByteArrayInputStream(it.bpmnXML.toByteArray())) }
?: throw RuntimeException("No process found with key '$processKey'")

val startEvents = process.getModelElementsByType(StartEvent::class.java)
.filter { it.scope is Process }

if (startEvents.isEmpty()) {
throw RuntimeException("No start event found.")
}

val noneStartEvent = startEvents.find { it.eventDefinitions.isEmpty() }

return noneStartEvent?.let {
createProcessInstanceWithNoneStartEvent(processKey, variables)
}
?: if (startEvents.size > 1) {
throw RuntimeException("More than one start event found but none of them is a none start event.")
} else {
return createProcessInstanceWithStartEvent(
processKey = processKey,
startEvent = startEvents.first(),
variables = variables
)
}
}

return zeebeClient.newCreateInstanceCommand()
private fun createProcessInstanceWithNoneStartEvent(processKey: Long, variables: String) =
zeebeClient.newCreateInstanceCommand()
.processDefinitionKey(processKey)
.variables(variables)
.send()
.join()
.processInstanceKey

private fun createProcessInstanceWithStartEvent(
processKey: Long,
startEvent: StartEvent,
variables: String
): Long {

if (startEvent.eventDefinitions.any { it is MessageEventDefinition }) {
return createProcessInstanceWithMessageStartEvent(processKey, variables)
} else if (startEvent.eventDefinitions.any { it is TimerEventDefinition }) {
return createProcessInstanceWithTimerStartEvent(processKey)
} else {
val type = startEvent.eventDefinitions.first().elementType.typeName
throw RuntimeException("Can't start process instance with start event of type '$type'")
}
}

private fun createProcessInstanceWithMessageStartEvent(
processKey: Long,
variables: String
): Long {
val messageSubscription = messageSubscriptionRepository
.findByProcessDefinitionKeyAndElementInstanceKeyIsNull(processKey)
.firstOrNull()
?: throw RuntimeException("No message subscription found for process '$processKey'")

val messageKey = zeebeClient.newPublishMessageCommand()
.messageName(messageSubscription.messageName)
.correlationKey("")
.variables(variables)
.timeToLive(Duration.ZERO)
.send()
.join()
.messageKey

return executor.submit(Callable {
getProcessInstanceKeyForMessage(
processKey = processKey,
messageKey = messageKey
)
}).get()
}

private fun getProcessInstanceKeyForMessage(processKey: Long, messageKey: Long): Long {
var processInstanceKey = -1L
while (processInstanceKey < 0) {
processInstanceKey =
messageCorrelationRepository.findByMessageKey(messageKey = messageKey)
.firstOrNull { it.processDefinitionKey == processKey }
?.processInstanceKey
?: run {
// wait and retry
Thread.sleep(RETRY_INTERVAL.toMillis())
-1L
}
}
return processInstanceKey
}

private fun createProcessInstanceWithTimerStartEvent(processKey: Long): Long {
val timer =
(timerRepository.findByProcessDefinitionKeyAndElementInstanceKeyIsNull(processKey)
.firstOrNull { it.state == TimerState.CREATED }
?: throw RuntimeException("No timer found for process '$processKey'"))

val currentTime = clockService.getCurrentTime()
val duration = Duration.between(currentTime, Instant.ofEpochMilli(timer.dueDate))
clockService.increaseTime(duration)

return executor.submit(Callable {
getProcessInstanceForTimer(
timerKey = timer.key
)
}).get()
}

private fun getProcessInstanceForTimer(timerKey: Long): Long {
var processInstanceKey = -1L
while (processInstanceKey < 0) {
processInstanceKey =
timerRepository.findByIdOrNull(timerKey)
?.takeIf { it.state == TimerState.TRIGGERED }
?.processInstanceKey
?: run {
// wait and retry
Thread.sleep(RETRY_INTERVAL.toMillis())
-1L
}
}
return processInstanceKey
}

@RequestMapping(
Expand Down