From 10ac9526076d73fc995cf68dc509cc3cd5d69691 Mon Sep 17 00:00:00 2001 From: Philipp Ossler Date: Thu, 9 Mar 2023 08:03:39 +0100 Subject: [PATCH] fix: create process instance with message/timer start event Change the API to create a new process instance to support not only none start events but also message and timer events. This change is a short-term solution to enable the Web-Modeler. --- .../zeebe/play/rest/ProcessesResource.kt | 148 +++++++++++++++++- 1 file changed, 146 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/camunda/community/zeebe/play/rest/ProcessesResource.kt b/src/main/kotlin/org/camunda/community/zeebe/play/rest/ProcessesResource.kt index 3f1a3a89..181c5c6c 100644 --- a/src/main/kotlin/org/camunda/community/zeebe/play/rest/ProcessesResource.kt +++ b/src/main/kotlin/org/camunda/community/zeebe/play/rest/ProcessesResource.kt @@ -1,28 +1,172 @@ package org.camunda.community.zeebe.play.rest import io.camunda.zeebe.client.ZeebeClient +import io.camunda.zeebe.model.bpmn.Bpmn +import io.camunda.zeebe.model.bpmn.instance.MessageEventDefinition +import io.camunda.zeebe.model.bpmn.instance.Process +import io.camunda.zeebe.model.bpmn.instance.StartEvent +import io.camunda.zeebe.model.bpmn.instance.TimerEventDefinition +import io.zeebe.zeeqs.data.entity.TimerState +import io.zeebe.zeeqs.data.repository.MessageCorrelationRepository +import io.zeebe.zeeqs.data.repository.MessageSubscriptionRepository +import io.zeebe.zeeqs.data.repository.ProcessRepository +import io.zeebe.zeeqs.data.repository.TimerRepository import org.camunda.community.zeebe.play.connectors.ConnectorService +import org.camunda.community.zeebe.play.services.ZeebeClockService +import org.springframework.data.repository.findByIdOrNull import org.springframework.web.bind.annotation.* +import java.io.ByteArrayInputStream +import java.time.Duration +import java.time.Instant +import java.util.concurrent.Callable +import java.util.concurrent.Executors + +private val RETRY_INTERVAL = Duration.ofMillis(100) @RestController @RequestMapping("/rest/processes") class ProcessesResource( private val zeebeClient: ZeebeClient, - private val connectorService: ConnectorService + private val connectorService: ConnectorService, + private val processRepository: ProcessRepository, + private val messageSubscriptionRepository: MessageSubscriptionRepository, + private val messageCorrelationRepository: MessageCorrelationRepository, + private val timerRepository: TimerRepository, + private val clockService: ZeebeClockService ) { + private val executor = Executors.newSingleThreadExecutor() + @RequestMapping(path = ["/{processKey}"], method = [RequestMethod.POST]) fun createInstance( @PathVariable("processKey") processKey: Long, @RequestBody variables: String ): Long { + // hack for the web-modeler to deal with not none start events + val process = processRepository.findByIdOrNull(processKey) + ?.let { Bpmn.readModelFromStream(ByteArrayInputStream(it.bpmnXML.toByteArray())) } + ?: throw RuntimeException("No process found with key '$processKey'") + + val startEvents = process.getModelElementsByType(StartEvent::class.java) + .filter { it.scope is Process } + + if (startEvents.isEmpty()) { + throw RuntimeException("No start event found.") + } + + val noneStartEvent = startEvents.find { it.eventDefinitions.isEmpty() } + + return noneStartEvent?.let { + createProcessInstanceWithNoneStartEvent(processKey, variables) + } + ?: if (startEvents.size > 1) { + throw RuntimeException("More than one start event found but none of them is a none start event.") + } else { + return createProcessInstanceWithStartEvent( + processKey = processKey, + startEvent = startEvents.first(), + variables = variables + ) + } + } - return zeebeClient.newCreateInstanceCommand() + private fun createProcessInstanceWithNoneStartEvent(processKey: Long, variables: String) = + zeebeClient.newCreateInstanceCommand() .processDefinitionKey(processKey) .variables(variables) .send() .join() .processInstanceKey + + private fun createProcessInstanceWithStartEvent( + processKey: Long, + startEvent: StartEvent, + variables: String + ): Long { + + if (startEvent.eventDefinitions.any { it is MessageEventDefinition }) { + return createProcessInstanceWithMessageStartEvent(processKey, variables) + } else if (startEvent.eventDefinitions.any { it is TimerEventDefinition }) { + return createProcessInstanceWithTimerStartEvent(processKey) + } else { + val type = startEvent.eventDefinitions.first().elementType.typeName + throw RuntimeException("Can't start process instance with start event of type '$type'") + } + } + + private fun createProcessInstanceWithMessageStartEvent( + processKey: Long, + variables: String + ): Long { + val messageSubscription = messageSubscriptionRepository + .findByProcessDefinitionKeyAndElementInstanceKeyIsNull(processKey) + .firstOrNull() + ?: throw RuntimeException("No message subscription found for process '$processKey'") + + val messageKey = zeebeClient.newPublishMessageCommand() + .messageName(messageSubscription.messageName) + .correlationKey("") + .variables(variables) + .timeToLive(Duration.ZERO) + .send() + .join() + .messageKey + + return executor.submit(Callable { + getProcessInstanceKeyForMessage( + processKey = processKey, + messageKey = messageKey + ) + }).get() + } + + private fun getProcessInstanceKeyForMessage(processKey: Long, messageKey: Long): Long { + var processInstanceKey = -1L + while (processInstanceKey < 0) { + processInstanceKey = + messageCorrelationRepository.findByMessageKey(messageKey = messageKey) + .firstOrNull { it.processDefinitionKey == processKey } + ?.processInstanceKey + ?: run { + // wait and retry + Thread.sleep(RETRY_INTERVAL.toMillis()) + -1L + } + } + return processInstanceKey + } + + private fun createProcessInstanceWithTimerStartEvent(processKey: Long): Long { + val timer = + (timerRepository.findByProcessDefinitionKeyAndElementInstanceKeyIsNull(processKey) + .firstOrNull { it.state == TimerState.CREATED } + ?: throw RuntimeException("No timer found for process '$processKey'")) + + val currentTime = clockService.getCurrentTime() + val duration = Duration.between(currentTime, Instant.ofEpochMilli(timer.dueDate)) + clockService.increaseTime(duration) + + return executor.submit(Callable { + getProcessInstanceForTimer( + timerKey = timer.key + ) + }).get() + } + + private fun getProcessInstanceForTimer(timerKey: Long): Long { + var processInstanceKey = -1L + while (processInstanceKey < 0) { + processInstanceKey = + timerRepository.findByIdOrNull(timerKey) + ?.takeIf { it.state == TimerState.TRIGGERED } + ?.processInstanceKey + ?: run { + // wait and retry + Thread.sleep(RETRY_INTERVAL.toMillis()) + -1L + } + } + return processInstanceKey } @RequestMapping(