diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Decision.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Decision.kt new file mode 100644 index 00000000..09319465 --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/Decision.kt @@ -0,0 +1,15 @@ +package io.zeebe.zeeqs.data.entity + +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id + +@Entity +data class Decision( + @Id @Column(name = "key_") val key: Long, + val decisionId: String, + val decisionName: String, + val version: Int, + val decisionRequirementsKey: Long, + val decisionRequirementsId: String +) \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/DecisionRequirements.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/DecisionRequirements.kt new file mode 100644 index 00000000..a884ecdc --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/entity/DecisionRequirements.kt @@ -0,0 +1,19 @@ +package io.zeebe.zeeqs.data.entity + +import javax.persistence.Column +import javax.persistence.Entity +import javax.persistence.Id +import javax.persistence.Lob + +@Entity +data class DecisionRequirements( + @Id @Column(name = "key_") val key: Long, + val decisionRequirementsId: String, + val decisionRequirementsName: String, + val version: Int, + val namespace: String, + @Lob val dmnXML: String, + val deployTime: Long, + val resourceName: String, + @Lob val checksum: String +) \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt index a0e89bd6..f78f2d81 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesPublisher.kt @@ -8,6 +8,7 @@ import java.util.function.Consumer class DataUpdatesPublisher { private val processListeners = mutableListOf>() + private val decisionListeners = mutableListOf>() private val processInstanceListeners = mutableListOf>() private val elementInstanceListeners = mutableListOf>() private val variableListeners = mutableListOf>() @@ -18,6 +19,10 @@ class DataUpdatesPublisher { processListeners.forEach { it.accept(process) } } + fun onDecisionUpdated(decision: Decision) { + decisionListeners.forEach { it.accept(decision) } + } + fun onProcessInstanceUpdated(processInstance: ProcessInstance) { processInstanceListeners.forEach { it.accept(processInstance) } } @@ -42,6 +47,10 @@ class DataUpdatesPublisher { processListeners.add(listener) } + fun registerDecisionListener(listener: Consumer) { + decisionListeners.add(listener) + } + fun registerProcessInstanceListener(listener: Consumer) { processInstanceListeners.add(listener) } diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt index 04a84e2a..08c4dd6b 100644 --- a/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/reactive/DataUpdatesSubscription.kt @@ -1,5 +1,6 @@ package io.zeebe.zeeqs.data.reactive +import io.zeebe.zeeqs.data.entity.Decision import io.zeebe.zeeqs.data.entity.Process import org.springframework.stereotype.Component import reactor.core.publisher.Flux @@ -13,46 +14,62 @@ class DataUpdatesSubscription(private val publisher: DataUpdatesPublisher) { } } + fun decisionSubscription(): Flux { + return Flux.create { sink -> + publisher.registerDecisionListener { sink.next(it) } + } + } + fun processInstanceUpdateSubscription(): Flux { return Flux.create { sink -> publisher.registerProcessInstanceListener { - sink.next(ProcessInstanceUpdate( + sink.next( + ProcessInstanceUpdate( processInstanceKey = it.key, processKey = it.processDefinitionKey, updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE - )) + ) + ) } publisher.registerElementInstanceListener { - sink.next(ProcessInstanceUpdate( + sink.next( + ProcessInstanceUpdate( processInstanceKey = it.processInstanceKey, processKey = it.processDefinitionKey, updateType = ProcessInstanceUpdateType.ELEMENT_INSTANCE - )) + ) + ) } publisher.registerVariableListener { - sink.next(ProcessInstanceUpdate( + sink.next( + ProcessInstanceUpdate( processInstanceKey = it.processInstanceKey, processKey = it.processDefinitionKey, updateType = ProcessInstanceUpdateType.VARIABLE - )) + ) + ) } publisher.registerIncidentListener { - sink.next(ProcessInstanceUpdate( + sink.next( + ProcessInstanceUpdate( processInstanceKey = it.processInstanceKey, processKey = it.processDefinitionKey, updateType = ProcessInstanceUpdateType.INCIDENT - )) + ) + ) } publisher.registerJobListener { - sink.next(ProcessInstanceUpdate( + sink.next( + ProcessInstanceUpdate( processInstanceKey = it.processInstanceKey, processKey = it.processDefinitionKey, updateType = ProcessInstanceUpdateType.JOB - )) + ) + ) } } } diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/DecisionRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/DecisionRepository.kt new file mode 100644 index 00000000..d22ffbde --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/DecisionRepository.kt @@ -0,0 +1,11 @@ +package io.zeebe.zeeqs.data.repository + +import io.zeebe.zeeqs.data.entity.Decision +import org.springframework.data.repository.PagingAndSortingRepository +import org.springframework.stereotype.Repository + +@Repository +interface DecisionRepository : PagingAndSortingRepository { + + fun findAllByDecisionRequirementsKey(decisionRequirementsKey: Long): List +} \ No newline at end of file diff --git a/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/DecisionRequirementsRepository.kt b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/DecisionRequirementsRepository.kt new file mode 100644 index 00000000..7a43cfeb --- /dev/null +++ b/data/src/main/kotlin/io/zeebe/zeeqs/data/repository/DecisionRequirementsRepository.kt @@ -0,0 +1,8 @@ +package io.zeebe.zeeqs.data.repository + +import io.zeebe.zeeqs.data.entity.DecisionRequirements +import org.springframework.data.repository.PagingAndSortingRepository +import org.springframework.stereotype.Repository + +@Repository +interface DecisionRequirementsRepository : PagingAndSortingRepository \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionConnection.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionConnection.kt new file mode 100644 index 00000000..cc1d0eee --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionConnection.kt @@ -0,0 +1,8 @@ +package io.zeebe.zeeqs.graphql.resolvers.connection + +import io.zeebe.zeeqs.data.entity.Decision + +class DecisionConnection( + val getItems: () -> List, + val getCount: () -> Long +) \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionConnectionResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionConnectionResolver.kt new file mode 100644 index 00000000..655fbd74 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionConnectionResolver.kt @@ -0,0 +1,19 @@ +package io.zeebe.zeeqs.graphql.resolvers.connection + +import io.zeebe.zeeqs.data.entity.Decision +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class DecisionConnectionResolver { + + @SchemaMapping(typeName = "DecisionConnection", field = "nodes") + fun nodes(connection: DecisionConnection): List { + return connection.getItems() + } + + @SchemaMapping(typeName = "DecisionConnection", field = "totalCount") + fun totalCount(connection: DecisionConnection): Long { + return connection.getCount() + } +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionRequirementsConnection.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionRequirementsConnection.kt new file mode 100644 index 00000000..1168a7a1 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionRequirementsConnection.kt @@ -0,0 +1,8 @@ +package io.zeebe.zeeqs.graphql.resolvers.connection + +import io.zeebe.zeeqs.data.entity.DecisionRequirements + +class DecisionRequirementsConnection( + val getItems: () -> List, + val getCount: () -> Long +) \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionRequirementsConnectionResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionRequirementsConnectionResolver.kt new file mode 100644 index 00000000..1d633109 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/connection/DecisionRequirementsConnectionResolver.kt @@ -0,0 +1,19 @@ +package io.zeebe.zeeqs.graphql.resolvers.connection + +import io.zeebe.zeeqs.data.entity.DecisionRequirements +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class DecisionRequirementsConnectionResolver { + + @SchemaMapping(typeName = "DecisionRequirementsConnection", field = "nodes") + fun nodes(connection: DecisionRequirementsConnection): List { + return connection.getItems() + } + + @SchemaMapping(typeName = "DecisionRequirementsConnection", field = "totalCount") + fun totalCount(connection: DecisionRequirementsConnection): Long { + return connection.getCount() + } +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/DecisionQueryResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/DecisionQueryResolver.kt new file mode 100644 index 00000000..1f6e2806 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/DecisionQueryResolver.kt @@ -0,0 +1,33 @@ +package io.zeebe.zeeqs.graphql.resolvers.query + +import io.zeebe.zeeqs.data.entity.Decision +import io.zeebe.zeeqs.data.repository.DecisionRepository +import io.zeebe.zeeqs.graphql.resolvers.connection.DecisionConnection +import org.springframework.data.domain.PageRequest +import org.springframework.data.repository.findByIdOrNull +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.QueryMapping +import org.springframework.stereotype.Controller + +@Controller +class DecisionQueryResolver( + private val decisionRepository: DecisionRepository +) { + + @QueryMapping + fun decisions( + @Argument perPage: Int, + @Argument page: Int + ): DecisionConnection { + return DecisionConnection( + getItems = { decisionRepository.findAll(PageRequest.of(page, perPage)).toList() }, + getCount = { decisionRepository.count() } + ) + } + + @QueryMapping + fun decision(@Argument key: Long): Decision? { + return decisionRepository.findByIdOrNull(key) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/DecisionRequirementsQueryResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/DecisionRequirementsQueryResolver.kt new file mode 100644 index 00000000..e1f42c37 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/query/DecisionRequirementsQueryResolver.kt @@ -0,0 +1,35 @@ +package io.zeebe.zeeqs.graphql.resolvers.query + +import io.zeebe.zeeqs.data.entity.DecisionRequirements +import io.zeebe.zeeqs.data.repository.DecisionRequirementsRepository +import io.zeebe.zeeqs.graphql.resolvers.connection.DecisionRequirementsConnection +import org.springframework.data.domain.PageRequest +import org.springframework.data.repository.findByIdOrNull +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.QueryMapping +import org.springframework.stereotype.Controller + +@Controller +class DecisionRequirementsQueryResolver( + private val decisionRequirementsRepository: DecisionRequirementsRepository +) { + + @QueryMapping + fun decisionRequirements( + @Argument perPage: Int, + @Argument page: Int + ): DecisionRequirementsConnection { + return DecisionRequirementsConnection( + getItems = { + decisionRequirementsRepository.findAll(PageRequest.of(page, perPage)).toList() + }, + getCount = { decisionRequirementsRepository.count() } + ) + } + + @QueryMapping + fun decisionRequirement(@Argument key: Long): DecisionRequirements? { + return decisionRequirementsRepository.findByIdOrNull(key) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/subscription/DecisionSubscriptionMapping.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/subscription/DecisionSubscriptionMapping.kt new file mode 100644 index 00000000..49a229e8 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/subscription/DecisionSubscriptionMapping.kt @@ -0,0 +1,18 @@ +package io.zeebe.zeeqs.graphql.resolvers.subscription + +import io.zeebe.zeeqs.data.entity.Decision +import io.zeebe.zeeqs.data.reactive.DataUpdatesSubscription +import org.springframework.graphql.data.method.annotation.SubscriptionMapping +import org.springframework.stereotype.Controller +import reactor.core.publisher.Flux + +@Controller +class DecisionSubscriptionMapping( + private val subscription: DataUpdatesSubscription +) { + + @SubscriptionMapping + fun decisionUpdates(): Flux { + return subscription.decisionSubscription() + } +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/DecisionRequirementsResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/DecisionRequirementsResolver.kt new file mode 100644 index 00000000..ae673d8e --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/DecisionRequirementsResolver.kt @@ -0,0 +1,35 @@ +package io.zeebe.zeeqs.graphql.resolvers.type + +import io.zeebe.zeeqs.data.entity.Decision +import io.zeebe.zeeqs.data.entity.DecisionRequirements +import io.zeebe.zeeqs.data.repository.DecisionRepository +import org.springframework.graphql.data.method.annotation.Argument +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class DecisionRequirementsResolver( + private val decisionRepository: DecisionRepository +) { + + @SchemaMapping(typeName = "DecisionRequirements", field = "deployTime") + fun deployTime( + decisionRequirements: DecisionRequirements, + @Argument zoneId: String + ): String? { + return decisionRequirements.deployTime.let { + ResolverExtension.timestampToString( + it, + zoneId + ) + } + } + + @SchemaMapping(typeName = "DecisionRequirements", field = "decisions") + fun decisions(decisionRequirements: DecisionRequirements): List { + return decisionRepository.findAllByDecisionRequirementsKey( + decisionRequirementsKey = decisionRequirements.key + ) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/DecisionResolver.kt b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/DecisionResolver.kt new file mode 100644 index 00000000..7a97adb3 --- /dev/null +++ b/graphql-api/src/main/kotlin/io/zeebe/zeeqs/graphql/resolvers/type/DecisionResolver.kt @@ -0,0 +1,20 @@ +package io.zeebe.zeeqs.graphql.resolvers.type + +import io.zeebe.zeeqs.data.entity.Decision +import io.zeebe.zeeqs.data.entity.DecisionRequirements +import io.zeebe.zeeqs.data.repository.DecisionRequirementsRepository +import org.springframework.data.repository.findByIdOrNull +import org.springframework.graphql.data.method.annotation.SchemaMapping +import org.springframework.stereotype.Controller + +@Controller +class DecisionResolver( + private val decisionRequirementsRepository: DecisionRequirementsRepository +) { + + @SchemaMapping(typeName = "Decision", field = "decisionRequirements") + fun decisionRequirements(decision: Decision): DecisionRequirements? { + return decisionRequirementsRepository.findByIdOrNull(decision.decisionRequirementsKey) + } + +} \ No newline at end of file diff --git a/graphql-api/src/main/resources/graphql/Decision.graphqls b/graphql-api/src/main/resources/graphql/Decision.graphqls new file mode 100644 index 00000000..75a6f2bc --- /dev/null +++ b/graphql-api/src/main/resources/graphql/Decision.graphqls @@ -0,0 +1,32 @@ +# A deployed DMN decision. +type Decision { + # The unique key of the decision. + key: ID! + # The id of the decision (i.e. the technical name of the decision). + decisionId: String! + # The name of the decision (i.e. the descriptive name of the decision). + decisionName: String! + # The deployed version of the decision based on the decision id. + version: Int! + # The decision requirements graph that contains the decision. + decisionRequirements: DecisionRequirements +} + +type DecisionConnection { + totalCount: Int! + nodes: [Decision!]! +} + +extend type Query { + # Find the decision with the given key. + decision(key: ID!): Decision + # Find all decisions. + decisions( + perPage: Int = 10, + page: Int = 0,): DecisionConnection! +} + +extend type Subscription { + # Subscribe to updates of decisions (i.e. new decision deployed). + decisionUpdates: Decision! +} \ No newline at end of file diff --git a/graphql-api/src/main/resources/graphql/DecisionRequirements.graphqls b/graphql-api/src/main/resources/graphql/DecisionRequirements.graphqls new file mode 100644 index 00000000..76075e56 --- /dev/null +++ b/graphql-api/src/main/resources/graphql/DecisionRequirements.graphqls @@ -0,0 +1,33 @@ +# A deployed DMN decision requirements graph (DRG/DRD). +type DecisionRequirements { + # The unique key of the DRG. + key: ID! + # The id of the DRG (i.e. the technical name of the DRG). + decisionRequirementsId: String! + # The name of the DRG (i.e. the descriptive name of the DRG). + decisionRequirementsName: String! + # The deployed version of the DRG based on the id. + version: Int! + # The namespace of the DRG. + namespace: String! + # The time when the DRG was deployed. + deployTime(zoneId: String = "Z"): String! + # The DMN XML resource of the DRG. + dmnXML: String! + # The decisions that belong to the DRG (i.e. the containing decisions). + decisions: [Decision!] +} + +type DecisionRequirementsConnection { + totalCount: Int! + nodes: [DecisionRequirements!]! +} + +extend type Query { + # Find the DRG with the given key. + decisionRequirement(key: ID!): DecisionRequirements + # Find all DRGs. + decisionRequirements( + perPage: Int = 10, + page: Int = 0,): DecisionRequirementsConnection! +} \ No newline at end of file diff --git a/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlDecisionTest.kt b/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlDecisionTest.kt new file mode 100644 index 00000000..b3ae2783 --- /dev/null +++ b/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlDecisionTest.kt @@ -0,0 +1,129 @@ +package io.zeebe.zeeqs + +import io.zeebe.zeeqs.data.entity.Decision +import io.zeebe.zeeqs.data.entity.DecisionRequirements +import io.zeebe.zeeqs.data.repository.DecisionRepository +import io.zeebe.zeeqs.data.repository.DecisionRequirementsRepository +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.context.TestConfiguration +import org.springframework.graphql.test.tester.GraphQlTester +import java.time.Instant + + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@TestConfiguration +class ZeebeGraphqlDecisionTest( + @Autowired private val graphQlTester: GraphQlTester, + @Autowired private val decisionRepository: DecisionRepository, + @Autowired private val decisionRequirementsRepository: DecisionRequirementsRepository +) { + + private val decisionRequirementsKey = 10L + private val decisionKey = 20L + + @BeforeEach + fun `deploy decision`() { + decisionRequirementsRepository.save( + DecisionRequirements( + key = decisionRequirementsKey, + decisionRequirementsId = "Ratings", + decisionRequirementsName = "Rating example", + version = 1, + namespace = "namespace", + dmnXML = "", + deployTime = Instant.now().toEpochMilli(), + resourceName = "rating.dmn", + checksum = "checksum" + ) + ) + + decisionRepository.save( + Decision( + key = decisionKey, + decisionId = "decision_a", + decisionName = "Decision A", + version = 1, + decisionRequirementsKey = decisionRequirementsKey, + decisionRequirementsId = "Rating example" + ) + ) + } + + @Test + fun `should query decision`() { + // when/then + graphQlTester.document( + """ + { + decisions { + nodes { + key + decisionId + decisionRequirements { + key + } + } + } + } + """ + ) + .execute() + .path("decisions.nodes") + .matchesJson( + """ + [ + { + "key": "$decisionKey", + "decisionId": "decision_a", + "decisionRequirements": { + "key": "$decisionRequirementsKey" + } + } + ] + """ + ) + } + + @Test + fun `should query decision requirements`() { + // when/then + graphQlTester.document( + """ + { + decisionRequirements { + nodes { + key + decisionRequirementsId + decisions { + key + } + } + } + } + """ + ) + .execute() + .path("decisionRequirements.nodes") + .matchesJson( + """ + [ + { + "key": "$decisionRequirementsKey", + "decisionRequirementsId": "Ratings", + "decisions": [ + { "key": "$decisionKey" } + ] + } + ] + """ + ) + } + + @SpringBootApplication + class TestConfig + +} \ No newline at end of file diff --git a/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlSubscriptionTest.kt b/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlSubscriptionTest.kt index d42d201d..20567797 100644 --- a/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlSubscriptionTest.kt +++ b/graphql-api/src/test/kotlin/io/zeebe/zeeqs/ZeebeGraphqlSubscriptionTest.kt @@ -1,6 +1,7 @@ package io.zeebe.zeeqs import io.camunda.zeebe.model.bpmn.Bpmn +import io.zeebe.zeeqs.data.entity.Decision import io.zeebe.zeeqs.data.entity.Process import io.zeebe.zeeqs.data.entity.ProcessInstance import io.zeebe.zeeqs.data.reactive.DataUpdatesSubscription @@ -27,11 +28,13 @@ import java.util.* @SpringBootTest( - webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, - properties = ["spring.graphql.websocket.path=/graphql"]) + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = ["spring.graphql.websocket.path=/graphql"] +) @TestConfiguration class ZeebeGraphqlSubscriptionTest( - @LocalServerPort val port: Int) { + @LocalServerPort val port: Int +) { @MockBean private lateinit var dataUpdatesSubscription: DataUpdatesSubscription @@ -40,9 +43,9 @@ class ZeebeGraphqlSubscriptionTest( private lateinit var processInstanceRepository: ProcessInstanceRepository private val bpmnProcess = Bpmn.createExecutableProcess("process") - .startEvent("start") - .endEvent("end") - .done() + .startEvent("start") + .endEvent("end") + .done() fun graphqlTester(): GraphQlTester { val uri = URI("http://localhost:$port/graphql") @@ -55,15 +58,15 @@ class ZeebeGraphqlSubscriptionTest( fun `mock process instance repository`() { Mockito.`when`(processInstanceRepository.findByIdOrNull(Mockito.anyLong())).thenAnswer { Optional.of( - ProcessInstance( - key = it.arguments[0] as Long, - position = 1, - bpmnProcessId = "process", - version = 1, - processDefinitionKey = 10, - parentProcessInstanceKey = null, - parentElementInstanceKey = null - ) + ProcessInstance( + key = it.arguments[0] as Long, + position = 1, + bpmnProcessId = "process", + version = 1, + processDefinitionKey = 10, + parentProcessInstanceKey = null, + parentElementInstanceKey = null + ) ) } } @@ -72,66 +75,76 @@ class ZeebeGraphqlSubscriptionTest( fun `should subscribe to process updates`() { // given Mockito.`when`(dataUpdatesSubscription.processSubscription()) - .thenReturn( - Flux.just( - process(key = 10), - process(key = 20) - ) + .thenReturn( + Flux.just( + process(key = 10), + process(key = 20) ) + ) // when - val flux = graphqlTester().document(""" + val flux = graphqlTester().document( + """ subscription { processUpdates { key } } - """) - .executeSubscription() - .toFlux() + """ + ) + .executeSubscription() + .toFlux() // then StepVerifier.create(flux) - .consumeNextWith { - it.path("processUpdates").matchesJson(""" + .consumeNextWith { + it.path("processUpdates").matchesJson( + """ { "key": "10" } - """) - } - .consumeNextWith { - it.path("processUpdates").matchesJson(""" + """ + ) + } + .consumeNextWith { + it.path("processUpdates").matchesJson( + """ { "key": "20" } - """) - } - .verifyComplete(); + """ + ) + } + .verifyComplete(); } @Test fun `should subscribe to all process instance updates`() { // given Mockito.`when`(dataUpdatesSubscription.processInstanceUpdateSubscription()) - .thenReturn( - Flux.just( - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 2, - processInstanceKey = 20, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.VARIABLE) - ) + .thenReturn( + Flux.just( + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 2, + processInstanceKey = 20, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.VARIABLE + ) ) + ) // when - val flux = graphqlTester().document(""" + val flux = graphqlTester().document( + """ subscription { processInstanceUpdates { processInstance { @@ -140,68 +153,79 @@ class ZeebeGraphqlSubscriptionTest( updateType } } - """) - .executeSubscription() - .toFlux() + """ + ) + .executeSubscription() + .toFlux() // then StepVerifier.create(flux) - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "10" }, "updateType": "PROCESS_INSTANCE_STATE" } - """) - } - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + """ + ) + } + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "20" }, "updateType": "PROCESS_INSTANCE_STATE" } - """) - } - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + """ + ) + } + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "10" }, "updateType": "VARIABLE" } - """) - } - .verifyComplete(); + """ + ) + } + .verifyComplete(); } @Test fun `should subscribe to process instance updates of given process`() { // given Mockito.`when`(dataUpdatesSubscription.processInstanceUpdateSubscription()) - .thenReturn( - Flux.just( - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 2, - processInstanceKey = 20, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 11, - updateType = ProcessInstanceUpdateType.VARIABLE) - ) + .thenReturn( + Flux.just( + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 2, + processInstanceKey = 20, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 11, + updateType = ProcessInstanceUpdateType.VARIABLE + ) ) + ) // when - val flux = graphqlTester().document(""" + val flux = graphqlTester().document( + """ subscription { processInstanceUpdates(filter: {processKey: 1}) { processInstance { @@ -210,58 +234,67 @@ class ZeebeGraphqlSubscriptionTest( updateType } } - """) - .executeSubscription() - .toFlux() + """ + ) + .executeSubscription() + .toFlux() // then StepVerifier.create(flux) - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "10" }, "updateType": "PROCESS_INSTANCE_STATE" } - """) - } - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + """ + ) + } + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "11" }, "updateType": "VARIABLE" } - """) - } - .verifyComplete(); + """ + ) + } + .verifyComplete(); } @Test fun `should subscribe to process instance updates of given instance`() { // given Mockito.`when`(dataUpdatesSubscription.processInstanceUpdateSubscription()) - .thenReturn( - Flux.just( - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 2, - processInstanceKey = 20, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.VARIABLE) - ) + .thenReturn( + Flux.just( + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 2, + processInstanceKey = 20, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.VARIABLE + ) ) + ) // when - val flux = graphqlTester().document(""" + val flux = graphqlTester().document( + """ subscription { processInstanceUpdates(filter: {processInstanceKey: 10}) { processInstance { @@ -270,58 +303,67 @@ class ZeebeGraphqlSubscriptionTest( updateType } } - """) - .executeSubscription() - .toFlux() + """ + ) + .executeSubscription() + .toFlux() // then StepVerifier.create(flux) - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "10" }, "updateType": "PROCESS_INSTANCE_STATE" } - """) - } - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + """ + ) + } + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "10" }, "updateType": "VARIABLE" } - """) - } - .verifyComplete(); + """ + ) + } + .verifyComplete(); } @Test fun `should subscribe to process instance updates of given update type`() { // given Mockito.`when`(dataUpdatesSubscription.processInstanceUpdateSubscription()) - .thenReturn( - Flux.just( - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE), - ProcessInstanceUpdate( - processKey = 1, - processInstanceKey = 10, - updateType = ProcessInstanceUpdateType.VARIABLE), - ProcessInstanceUpdate( - processKey = 2, - processInstanceKey = 20, - updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE) - ) + .thenReturn( + Flux.just( + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ), + ProcessInstanceUpdate( + processKey = 1, + processInstanceKey = 10, + updateType = ProcessInstanceUpdateType.VARIABLE + ), + ProcessInstanceUpdate( + processKey = 2, + processInstanceKey = 20, + updateType = ProcessInstanceUpdateType.PROCESS_INSTANCE_STATE + ) ) + ) // when - val flux = graphqlTester().document(""" + val flux = graphqlTester().document( + """ subscription { processInstanceUpdates(filter: {updateTypeIn: [PROCESS_INSTANCE_STATE]}) { processInstance { @@ -330,44 +372,107 @@ class ZeebeGraphqlSubscriptionTest( updateType } } - """) - .executeSubscription() - .toFlux() + """ + ) + .executeSubscription() + .toFlux() // then StepVerifier.create(flux) - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "10" }, "updateType": "PROCESS_INSTANCE_STATE" } - """) - } - .consumeNextWith { - it.path("processInstanceUpdates").matchesJson(""" + """ + ) + } + .consumeNextWith { + it.path("processInstanceUpdates").matchesJson( + """ { "processInstance": { "key": "20" }, "updateType": "PROCESS_INSTANCE_STATE" } - """) - } - .verifyComplete(); + """ + ) + } + .verifyComplete(); + } + + @Test + fun `should subscribe to decision updates`() { + // given + Mockito.`when`(dataUpdatesSubscription.decisionSubscription()) + .thenReturn( + Flux.just( + decision(key = 10), + decision(key = 20) + ) + ) + + // when + val flux = graphqlTester().document( + """ + subscription { + decisionUpdates { + key + } + } + """ + ) + .executeSubscription() + .toFlux() + + // then + StepVerifier.create(flux) + .consumeNextWith { + it.path("decisionUpdates").matchesJson( + """ + { + "key": "10" + } + """ + ) + } + .consumeNextWith { + it.path("decisionUpdates").matchesJson( + """ + { + "key": "20" + } + """ + ) + } + .verifyComplete(); } private fun process(key: Long): Process { return Process( - key = key, - bpmnProcessId = "process", - version = 1, - bpmnXML = Bpmn.convertToString(bpmnProcess), - deployTime = Instant.now().toEpochMilli(), - resourceName = "process.bpmn", - checksum = "checksum" + key = key, + bpmnProcessId = "process", + version = 1, + bpmnXML = Bpmn.convertToString(bpmnProcess), + deployTime = Instant.now().toEpochMilli(), + resourceName = "process.bpmn", + checksum = "checksum" + ) + } + + private fun decision(key: Long): Decision { + return Decision( + key = key, + decisionId = "decision-id", + decisionName = "decision-name", + version = 1, + decisionRequirementsKey = 10, + decisionRequirementsId = "decision-requirements-id" ) } diff --git a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt index 94c65d84..a324397d 100644 --- a/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt +++ b/hazelcast-importer/src/main/kotlin/io/zeebe/zeeqs/importer/hazelcast/HazelcastImporter.kt @@ -16,23 +16,25 @@ import java.time.Duration @Component class HazelcastImporter( - val hazelcastConfigRepository: HazelcastConfigRepository, - val processRepository: ProcessRepository, - val processInstanceRepository: ProcessInstanceRepository, - val elementInstanceRepository: ElementInstanceRepository, - val elementInstanceStateTransitionRepository: ElementInstanceStateTransitionRepository, - val variableRepository: VariableRepository, - val variableUpdateRepository: VariableUpdateRepository, - val jobRepository: JobRepository, - val userTaskRepository: UserTaskRepository, - val incidentRepository: IncidentRepository, - val timerRepository: TimerRepository, - val messageRepository: MessageRepository, - val messageVariableRepository: MessageVariableRepository, - val messageSubscriptionRepository: MessageSubscriptionRepository, - val messageCorrelationRepository: MessageCorrelationRepository, - val errorRepository: ErrorRepository, - private val dataUpdatesPublisher: DataUpdatesPublisher + val hazelcastConfigRepository: HazelcastConfigRepository, + val processRepository: ProcessRepository, + val processInstanceRepository: ProcessInstanceRepository, + val elementInstanceRepository: ElementInstanceRepository, + val elementInstanceStateTransitionRepository: ElementInstanceStateTransitionRepository, + val variableRepository: VariableRepository, + val variableUpdateRepository: VariableUpdateRepository, + val jobRepository: JobRepository, + val userTaskRepository: UserTaskRepository, + val incidentRepository: IncidentRepository, + val timerRepository: TimerRepository, + val messageRepository: MessageRepository, + val messageVariableRepository: MessageVariableRepository, + val messageSubscriptionRepository: MessageSubscriptionRepository, + val messageCorrelationRepository: MessageCorrelationRepository, + val errorRepository: ErrorRepository, + private val decisionRepository: DecisionRepository, + private val decisionRequirementsRepository: DecisionRequirementsRepository, + private val dataUpdatesPublisher: DataUpdatesPublisher ) { var zeebeHazelcast: ZeebeHazelcast? = null @@ -43,17 +45,17 @@ class HazelcastImporter( val hazelcastConnectionTimeout = Duration.parse(hazelcastProperties.connectionTimeout) val hazelcastRingbuffer = hazelcastProperties.ringbuffer val hazelcastConnectionInitialBackoff = - Duration.parse(hazelcastProperties.connectionInitialBackoff) + Duration.parse(hazelcastProperties.connectionInitialBackoff) val hazelcastConnectionBackoffMultiplier = hazelcastProperties.connectionBackoffMultiplier val hazelcastConnectionMaxBackoff = Duration.parse(hazelcastProperties.connectionMaxBackoff) val hazelcastConfig = hazelcastConfigRepository.findById(hazelcastConnection) - .orElse( - HazelcastConfig( - id = hazelcastConnection, - sequence = -1 - ) + .orElse( + HazelcastConfig( + id = hazelcastConnection, + sequence = -1 ) + ) val updateSequence: ((Long) -> Unit) = { hazelcastConfig.sequence = it @@ -68,49 +70,56 @@ class HazelcastImporter( connectionRetryConfig.clusterConnectTimeoutMillis = hazelcastConnectionTimeout.toMillis() // These retry configs can be user-configured in application.yml connectionRetryConfig.initialBackoffMillis = - hazelcastConnectionInitialBackoff.toMillis().toInt() + hazelcastConnectionInitialBackoff.toMillis().toInt() connectionRetryConfig.multiplier = hazelcastConnectionBackoffMultiplier connectionRetryConfig.maxBackoffMillis = hazelcastConnectionMaxBackoff.toMillis().toInt() val hazelcast = HazelcastClient.newHazelcastClient(clientConfig) val builder = ZeebeHazelcast.newBuilder(hazelcast).name(hazelcastRingbuffer) - .addProcessListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT }?.let(this::importProcess) - } - .addProcessInstanceListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importProcessInstanceRecord) - } - .addVariableListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importVariableRecord) - } - .addJobListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT }?.let(this::importJobRecord) - } - .addIncidentListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importIncidentRecord) - } - .addTimerListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importTimerRecord) - } - .addMessageListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importMessageRecord) - } - .addMessageStartEventSubscriptionListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importMessageStartEventSubscriptionRecord) - } - .addProcessMessageSubscriptionListener { - it.takeIf { it.metadata.recordType == RecordType.EVENT } - ?.let(this::importProcessMessageSubscriptionRecord) - } - .addErrorListener(this::importError) - .postProcessListener(updateSequence) + .addProcessListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT }?.let(this::importProcess) + } + .addProcessInstanceListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importProcessInstanceRecord) + } + .addVariableListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importVariableRecord) + } + .addJobListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT }?.let(this::importJobRecord) + } + .addIncidentListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importIncidentRecord) + } + .addTimerListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importTimerRecord) + } + .addMessageListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importMessageRecord) + } + .addMessageStartEventSubscriptionListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importMessageStartEventSubscriptionRecord) + } + .addProcessMessageSubscriptionListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importProcessMessageSubscriptionRecord) + } + .addDecisionListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT }?.let(this::importDecision) + } + .addDecisionRequirementsListener { + it.takeIf { it.metadata.recordType == RecordType.EVENT } + ?.let(this::importDecisionRequirements) + } + .addErrorListener(this::importError) + .postProcessListener(updateSequence) if (hazelcastConfig.sequence >= 0) { builder.readFrom(hazelcastConfig.sequence) @@ -126,12 +135,12 @@ class HazelcastImporter( } private fun getPartitionIdWithPosition(metadata: Schema.RecordMetadata) = - "${metadata.partitionId}-${metadata.position}" + "${metadata.partitionId}-${metadata.position}" private fun importProcess(process: Schema.ProcessRecord) { val entity = processRepository - .findById(process.processDefinitionKey) - .orElse(createProcess(process)) + .findById(process.processDefinitionKey) + .orElse(createProcess(process)) processRepository.save(entity) @@ -140,13 +149,13 @@ class HazelcastImporter( private fun createProcess(process: Schema.ProcessRecord): Process { return Process( - key = process.processDefinitionKey, - bpmnProcessId = process.bpmnProcessId, - version = process.version, - bpmnXML = process.resource.toStringUtf8(), - deployTime = process.metadata.timestamp, - resourceName = process.resourceName, - checksum = process.checksum.toStringUtf8() + key = process.processDefinitionKey, + bpmnProcessId = process.bpmnProcessId, + version = process.version, + bpmnXML = process.resource.toStringUtf8(), + deployTime = process.metadata.timestamp, + resourceName = process.resourceName, + checksum = process.checksum.toStringUtf8() ) } @@ -161,8 +170,8 @@ class HazelcastImporter( private fun importProcessInstance(record: Schema.ProcessInstanceRecord) { val entity = processInstanceRepository - .findById(record.processInstanceKey) - .orElse(createProcessInstance(record)) + .findById(record.processInstanceKey) + .orElse(createProcessInstance(record)) when (record.metadata.intent) { "ELEMENT_ACTIVATED" -> { @@ -188,20 +197,20 @@ class HazelcastImporter( private fun createProcessInstance(record: Schema.ProcessInstanceRecord): ProcessInstance { return ProcessInstance( - key = record.processInstanceKey, - position = record.metadata.position, - bpmnProcessId = record.bpmnProcessId, - version = record.version, - processDefinitionKey = record.processDefinitionKey, - parentProcessInstanceKey = record.parentProcessInstanceKey.takeIf { it > 0 }, - parentElementInstanceKey = record.parentElementInstanceKey.takeIf { it > 0 } + key = record.processInstanceKey, + position = record.metadata.position, + bpmnProcessId = record.bpmnProcessId, + version = record.version, + processDefinitionKey = record.processDefinitionKey, + parentProcessInstanceKey = record.parentProcessInstanceKey.takeIf { it > 0 }, + parentElementInstanceKey = record.parentElementInstanceKey.takeIf { it > 0 } ) } private fun importElementInstance(record: Schema.ProcessInstanceRecord) { val entity = elementInstanceRepository - .findById(record.metadata.key) - .orElse(createElementInstance(record)) + .findById(record.metadata.key) + .orElse(createElementInstance(record)) entity.state = getElementInstanceState(record) @@ -253,13 +262,13 @@ class HazelcastImporter( } return ElementInstance( - key = record.metadata.key, - position = record.metadata.position, - elementId = record.elementId, - bpmnElementType = bpmnElementType, - processInstanceKey = record.processInstanceKey, - processDefinitionKey = record.processDefinitionKey, - scopeKey = record.flowScopeKey.takeIf { it > 0 } + key = record.metadata.key, + position = record.metadata.position, + elementId = record.elementId, + bpmnElementType = bpmnElementType, + processInstanceKey = record.processInstanceKey, + processDefinitionKey = record.processDefinitionKey, + scopeKey = record.flowScopeKey.takeIf { it > 0 } ) } @@ -282,15 +291,15 @@ class HazelcastImporter( val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = elementInstanceStateTransitionRepository - .findById(partitionIdWithPosition) - .orElse( - ElementInstanceStateTransition( - partitionIdWithPosition = partitionIdWithPosition, - elementInstanceKey = record.metadata.key, - timestamp = record.metadata.timestamp, - state = state - ) + .findById(partitionIdWithPosition) + .orElse( + ElementInstanceStateTransition( + partitionIdWithPosition = partitionIdWithPosition, + elementInstanceKey = record.metadata.key, + timestamp = record.metadata.timestamp, + state = state ) + ) elementInstanceStateTransitionRepository.save(entity) } @@ -303,8 +312,8 @@ class HazelcastImporter( private fun importVariable(record: Schema.VariableRecord) { val entity = variableRepository - .findById(record.metadata.key) - .orElse(createVariable(record)) + .findById(record.metadata.key) + .orElse(createVariable(record)) entity.value = record.value entity.timestamp = record.metadata.timestamp @@ -315,14 +324,14 @@ class HazelcastImporter( private fun createVariable(record: Schema.VariableRecord): Variable { return Variable( - key = record.metadata.key, - position = record.metadata.position, - name = record.name, - value = record.value, - processInstanceKey = record.processInstanceKey, - processDefinitionKey = record.processDefinitionKey, - scopeKey = record.scopeKey, - timestamp = record.metadata.timestamp + key = record.metadata.key, + position = record.metadata.position, + name = record.name, + value = record.value, + processInstanceKey = record.processInstanceKey, + processDefinitionKey = record.processDefinitionKey, + scopeKey = record.scopeKey, + timestamp = record.metadata.timestamp ) } @@ -330,18 +339,18 @@ class HazelcastImporter( val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = variableUpdateRepository - .findById(partitionIdWithPosition) - .orElse( - VariableUpdate( - partitionIdWithPosition = partitionIdWithPosition, - variableKey = record.metadata.key, - name = record.name, - value = record.value, - processInstanceKey = record.processInstanceKey, - scopeKey = record.scopeKey, - timestamp = record.metadata.timestamp - ) + .findById(partitionIdWithPosition) + .orElse( + VariableUpdate( + partitionIdWithPosition = partitionIdWithPosition, + variableKey = record.metadata.key, + name = record.name, + value = record.value, + processInstanceKey = record.processInstanceKey, + scopeKey = record.scopeKey, + timestamp = record.metadata.timestamp ) + ) variableUpdateRepository.save(entity) } @@ -354,12 +363,13 @@ class HazelcastImporter( } } - private fun isJobForUserTask(record: Schema.JobRecord) = record.type == Protocol.USER_TASK_JOB_TYPE + private fun isJobForUserTask(record: Schema.JobRecord) = + record.type == Protocol.USER_TASK_JOB_TYPE private fun importJobForWorker(record: Schema.JobRecord) { val entity = jobRepository - .findById(record.metadata.key) - .orElse(createJob(record)) + .findById(record.metadata.key) + .orElse(createJob(record)) when (record.metadata.intent) { "CREATED" -> { @@ -395,19 +405,19 @@ class HazelcastImporter( private fun createJob(record: Schema.JobRecord): Job { return Job( - key = record.metadata.key, - position = record.metadata.position, - jobType = record.type, - processInstanceKey = record.processInstanceKey, - elementInstanceKey = record.elementInstanceKey, - processDefinitionKey = record.processDefinitionKey + key = record.metadata.key, + position = record.metadata.position, + jobType = record.type, + processInstanceKey = record.processInstanceKey, + elementInstanceKey = record.elementInstanceKey, + processDefinitionKey = record.processDefinitionKey ) } private fun importUserTask(record: Schema.JobRecord) { val entity = userTaskRepository - .findById(record.metadata.key) - .orElse(createUserTask(record)) + .findById(record.metadata.key) + .orElse(createUserTask(record)) when (record.metadata.intent) { "CREATED" -> { @@ -433,24 +443,25 @@ class HazelcastImporter( private fun createUserTask(record: Schema.JobRecord): UserTask { val customHeaders = record.customHeaders.fieldsMap val assignee = customHeaders[Protocol.USER_TASK_ASSIGNEE_HEADER_NAME]?.stringValue - val candidateGroups = customHeaders[Protocol.USER_TASK_CANDIDATE_GROUPS_HEADER_NAME]?.stringValue + val candidateGroups = + customHeaders[Protocol.USER_TASK_CANDIDATE_GROUPS_HEADER_NAME]?.stringValue val formKey = customHeaders[Protocol.USER_TASK_FORM_KEY_HEADER_NAME]?.stringValue return UserTask( - key = record.metadata.key, - position = record.metadata.position, - processInstanceKey = record.processInstanceKey, - processDefinitionKey = record.processDefinitionKey, - elementInstanceKey = record.elementInstanceKey, - assignee = assignee, - candidateGroups = candidateGroups, - formKey = formKey + key = record.metadata.key, + position = record.metadata.position, + processInstanceKey = record.processInstanceKey, + processDefinitionKey = record.processDefinitionKey, + elementInstanceKey = record.elementInstanceKey, + assignee = assignee, + candidateGroups = candidateGroups, + formKey = formKey ) } private fun importIncidentRecord(record: Schema.IncidentRecord) { val entity = incidentRepository - .findById(record.metadata.key) - .orElse(createIncident(record)) + .findById(record.metadata.key) + .orElse(createIncident(record)) when (record.metadata.intent) { "CREATED" -> { @@ -470,21 +481,21 @@ class HazelcastImporter( private fun createIncident(record: Schema.IncidentRecord): Incident { return Incident( - key = record.metadata.key, - position = record.metadata.position, - errorType = record.errorType, - errorMessage = record.errorMessage, - processInstanceKey = record.processInstanceKey, - processDefinitionKey = record.processDefinitionKey, - elementInstanceKey = record.elementInstanceKey, - jobKey = record.jobKey.takeIf { it > 0 } + key = record.metadata.key, + position = record.metadata.position, + errorType = record.errorType, + errorMessage = record.errorMessage, + processInstanceKey = record.processInstanceKey, + processDefinitionKey = record.processDefinitionKey, + elementInstanceKey = record.elementInstanceKey, + jobKey = record.jobKey.takeIf { it > 0 } ) } private fun importTimerRecord(record: Schema.TimerRecord) { val entity = timerRepository - .findById(record.metadata.key) - .orElse(createTimer(record)) + .findById(record.metadata.key) + .orElse(createTimer(record)) when (record.metadata.intent) { "CREATED" -> { @@ -511,14 +522,14 @@ class HazelcastImporter( private fun createTimer(record: Schema.TimerRecord): Timer { return Timer( - key = record.metadata.key, - position = record.metadata.position, - dueDate = record.dueDate, - repetitions = record.repetitions, - elementId = record.targetElementId, - processDefinitionKey = record.processDefinitionKey.takeIf { it > 0 }, - processInstanceKey = record.processInstanceKey.takeIf { it > 0 }, - elementInstanceKey = record.elementInstanceKey.takeIf { it > 0 } + key = record.metadata.key, + position = record.metadata.position, + dueDate = record.dueDate, + repetitions = record.repetitions, + elementId = record.targetElementId, + processDefinitionKey = record.processDefinitionKey.takeIf { it > 0 }, + processInstanceKey = record.processInstanceKey.takeIf { it > 0 }, + elementInstanceKey = record.elementInstanceKey.takeIf { it > 0 } ); } @@ -532,8 +543,8 @@ class HazelcastImporter( private fun importMessage(record: Schema.MessageRecord) { val entity = messageRepository - .findById(record.metadata.key) - .orElse(createMessage(record)) + .findById(record.metadata.key) + .orElse(createMessage(record)) when (record.metadata.intent) { "PUBLISHED" -> entity.state = MessageState.PUBLISHED @@ -547,12 +558,12 @@ class HazelcastImporter( private fun createMessage(record: Schema.MessageRecord): Message { return Message( - key = record.metadata.key, - position = record.metadata.position, - name = record.name, - correlationKey = record.correlationKey.takeIf { it.isNotEmpty() }, - messageId = record.messageId.takeIf { it.isNotEmpty() }, - timeToLive = record.timeToLive + key = record.metadata.key, + position = record.metadata.position, + name = record.name, + correlationKey = record.correlationKey.takeIf { it.isNotEmpty() }, + messageId = record.messageId.takeIf { it.isNotEmpty() }, + timeToLive = record.timeToLive ); } @@ -564,14 +575,16 @@ class HazelcastImporter( val id = messageKey.toString() + name val entity = messageVariableRepository - .findById(id) - .orElse(MessageVariable( - id = id, - name = name, - value = value, - messageKey = messageKey, - position = messagePosition - )) + .findById(id) + .orElse( + MessageVariable( + id = id, + name = name, + value = value, + messageKey = messageKey, + position = messagePosition + ) + ) messageVariableRepository.save(entity) } @@ -587,16 +600,23 @@ class HazelcastImporter( Value.KindCase.BOOL_VALUE -> value.boolValue.toString() Value.KindCase.NUMBER_VALUE -> value.numberValue.toString() Value.KindCase.STRING_VALUE -> "\"${value.stringValue}\"" - Value.KindCase.LIST_VALUE -> value.listValue.valuesList.map { valueToString(it) }.joinToString(separator = ",", prefix = "[", postfix = "]") - Value.KindCase.STRUCT_VALUE -> value.structValue.fieldsMap.map { (key, value) -> "\"$key\":" + valueToString(value) }.joinToString(separator = ",", prefix = "{", postfix = "}") + Value.KindCase.LIST_VALUE -> value.listValue.valuesList.map { valueToString(it) } + .joinToString(separator = ",", prefix = "[", postfix = "]") + + Value.KindCase.STRUCT_VALUE -> value.structValue.fieldsMap.map { (key, value) -> + "\"$key\":" + valueToString( + value + ) + }.joinToString(separator = ",", prefix = "{", postfix = "}") + else -> value.toString() } } private fun importMessageStartEventSubscriptionRecord(record: Schema.MessageStartEventSubscriptionRecord) { val entity = messageSubscriptionRepository - .findById(record.metadata.key) - .orElse(createMessageSubscription(record)) + .findById(record.metadata.key) + .orElse(createMessageSubscription(record)) when (record.metadata.intent) { "CREATED" -> entity.state = MessageSubscriptionState.CREATED @@ -615,21 +635,21 @@ class HazelcastImporter( private fun createMessageSubscription(record: Schema.MessageStartEventSubscriptionRecord): MessageSubscription { return MessageSubscription( - key = record.metadata.key, - position = record.metadata.position, - messageName = record.messageName, - processDefinitionKey = record.processDefinitionKey, - elementId = record.startEventId, - elementInstanceKey = null, - processInstanceKey = null, - messageCorrelationKey = null + key = record.metadata.key, + position = record.metadata.position, + messageName = record.messageName, + processDefinitionKey = record.processDefinitionKey, + elementId = record.startEventId, + elementInstanceKey = null, + processInstanceKey = null, + messageCorrelationKey = null ); } private fun importProcessMessageSubscriptionRecord(record: Schema.ProcessMessageSubscriptionRecord) { val entity = messageSubscriptionRepository - .findById(record.metadata.key) - .orElse(createMessageSubscription(record)) + .findById(record.metadata.key) + .orElse(createMessageSubscription(record)) when (record.metadata.intent) { "CREATING" -> entity.state = MessageSubscriptionState.CREATING @@ -652,14 +672,14 @@ class HazelcastImporter( private fun createMessageSubscription(record: Schema.ProcessMessageSubscriptionRecord): MessageSubscription { return MessageSubscription( - key = record.metadata.key, - position = record.metadata.position, - messageName = record.messageName, - messageCorrelationKey = record.correlationKey, - processInstanceKey = record.processInstanceKey, - elementInstanceKey = record.elementInstanceKey, - elementId = record.elementId, - processDefinitionKey = null + key = record.metadata.key, + position = record.metadata.position, + messageName = record.messageName, + messageCorrelationKey = record.correlationKey, + processInstanceKey = record.processInstanceKey, + elementInstanceKey = record.elementInstanceKey, + elementId = record.elementId, + processDefinitionKey = null ); } @@ -667,19 +687,19 @@ class HazelcastImporter( val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = messageCorrelationRepository - .findById(partitionIdWithPosition) - .orElse( - MessageCorrelation( - partitionIdWithPosition = partitionIdWithPosition, - messageKey = record.messageKey, - messageName = record.messageName, - elementInstanceKey = record.elementInstanceKey, - processInstanceKey = record.processInstanceKey, - elementId = record.elementId, - processDefinitionKey = null, - timestamp = record.metadata.timestamp - ) + .findById(partitionIdWithPosition) + .orElse( + MessageCorrelation( + partitionIdWithPosition = partitionIdWithPosition, + messageKey = record.messageKey, + messageName = record.messageName, + elementInstanceKey = record.elementInstanceKey, + processInstanceKey = record.processInstanceKey, + elementId = record.elementId, + processDefinitionKey = null, + timestamp = record.metadata.timestamp ) + ) messageCorrelationRepository.save(entity) } @@ -688,19 +708,19 @@ class HazelcastImporter( val partitionIdWithPosition = getPartitionIdWithPosition(record.metadata) val entity = messageCorrelationRepository - .findById(partitionIdWithPosition) - .orElse( - MessageCorrelation( - partitionIdWithPosition = partitionIdWithPosition, - messageKey = record.messageKey, - messageName = record.messageName, - elementInstanceKey = null, - processInstanceKey = record.processInstanceKey, - elementId = record.startEventId, - processDefinitionKey = record.processDefinitionKey, - timestamp = record.metadata.timestamp - ) + .findById(partitionIdWithPosition) + .orElse( + MessageCorrelation( + partitionIdWithPosition = partitionIdWithPosition, + messageKey = record.messageKey, + messageName = record.messageName, + elementInstanceKey = null, + processInstanceKey = record.processInstanceKey, + elementId = record.startEventId, + processDefinitionKey = record.processDefinitionKey, + timestamp = record.metadata.timestamp ) + ) messageCorrelationRepository.save(entity) } @@ -708,15 +728,59 @@ class HazelcastImporter( private fun importError(record: Schema.ErrorRecord) { val entity = errorRepository.findById(record.metadata.position) - .orElse(Error( - position = record.metadata.position, - errorEventPosition = record.errorEventPosition, - exceptionMessage = record.exceptionMessage, - stacktrace = record.stacktrace, - processInstanceKey = record.processInstanceKey.takeIf { it > 0 } - )) + .orElse(Error( + position = record.metadata.position, + errorEventPosition = record.errorEventPosition, + exceptionMessage = record.exceptionMessage, + stacktrace = record.stacktrace, + processInstanceKey = record.processInstanceKey.takeIf { it > 0 } + )) errorRepository.save(entity) } + private fun importDecision(decision: Schema.DecisionRecord) { + val entity = decisionRepository + .findById(decision.decisionKey) + .orElse(createDecision(decision)) + + decisionRepository.save(entity) + + dataUpdatesPublisher.onDecisionUpdated(entity) + } + + private fun createDecision(decision: Schema.DecisionRecord): Decision { + return Decision( + key = decision.decisionKey, + decisionId = decision.decisionId, + decisionName = decision.decisionName, + version = decision.version, + decisionRequirementsKey = decision.decisionRequirementsKey, + decisionRequirementsId = decision.decisionRequirementsId + ) + } + + private fun importDecisionRequirements(decisionRequirements: Schema.DecisionRequirementsRecord) { + val entity = decisionRequirementsRepository + .findById(decisionRequirements.decisionRequirementsMetadata.decisionRequirementsKey) + .orElse(createDecisionRequirements(decisionRequirements)) + + decisionRequirementsRepository.save(entity) + } + + private fun createDecisionRequirements(decisionRequirements: Schema.DecisionRequirementsRecord): DecisionRequirements { + val metadata = decisionRequirements.decisionRequirementsMetadata + return DecisionRequirements( + key = metadata.decisionRequirementsKey, + decisionRequirementsId = metadata.decisionRequirementsId, + decisionRequirementsName = metadata.decisionRequirementsName, + version = metadata.decisionRequirementsVersion, + namespace = metadata.namespace, + dmnXML = decisionRequirements.resource.toStringUtf8(), + deployTime = decisionRequirements.metadata.timestamp, + resourceName = metadata.resourceName, + checksum = metadata.checksum.toStringUtf8() + ) + } + } diff --git a/hazelcast-importer/src/test/kotlin/io/zeebe/zeeqs/HazelcastImporterDecisionTest.kt b/hazelcast-importer/src/test/kotlin/io/zeebe/zeeqs/HazelcastImporterDecisionTest.kt new file mode 100644 index 00000000..a535f8c3 --- /dev/null +++ b/hazelcast-importer/src/test/kotlin/io/zeebe/zeeqs/HazelcastImporterDecisionTest.kt @@ -0,0 +1,111 @@ +package io.zeebe.zeeqs + +import io.camunda.zeebe.client.ZeebeClient +import io.zeebe.containers.ZeebeContainer +import io.zeebe.zeeqs.data.repository.DecisionRepository +import io.zeebe.zeeqs.data.repository.DecisionRequirementsRepository +import io.zeebe.zeeqs.importer.hazelcast.HazelcastImporter +import io.zeebe.zeeqs.importer.hazelcast.HazelcastProperties +import org.assertj.core.api.Assertions.assertThat +import org.awaitility.kotlin.await +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.test.context.SpringBootTest +import org.testcontainers.junit.jupiter.Container +import org.testcontainers.junit.jupiter.Testcontainers +import javax.transaction.Transactional + +@SpringBootTest +@Testcontainers +@Transactional +class HazelcastImporterDecisionTest( + @Autowired val importer: HazelcastImporter, + @Autowired val decisionRepository: DecisionRepository, + @Autowired val decisionRequirementsRepository: DecisionRequirementsRepository +) { + + + private val hazelcastPort = 5701 + + private lateinit var zeebeClient: ZeebeClient + + @Container + var zeebe = ZeebeContainer(ZeebeTestcontainerUtil.ZEEBE_DOCKER_IMAGE) + .withAdditionalExposedPort(hazelcastPort) + + @BeforeEach + fun `start importer`() { + val port = zeebe.getMappedPort(hazelcastPort) + val hazelcastProperties = HazelcastProperties( + "localhost:$port", "PT10S", "zeebe" + ) + importer.start(hazelcastProperties) + } + + @BeforeEach + fun `create Zeebe client`() { + zeebeClient = ZeebeClient.newClientBuilder() + .gatewayAddress(zeebe.externalGatewayAddress) + .usePlaintext() + .build() + } + + @Test + fun `should import decision`() { + // when + zeebeClient.newDeployResourceCommand() + .addResourceFromClasspath("rating.dmn") + .send() + .join() + + // then + await.untilAsserted { assertThat(decisionRepository.findAll()).hasSize(2) } + + val decisionA = decisionRepository.findAll().first { it.decisionId == "decision_a" } + assertThat(decisionA).isNotNull + assertThat(decisionA.key).isPositive() + assertThat(decisionA.decisionId).isEqualTo("decision_a") + assertThat(decisionA.decisionName).isEqualTo("Decision A") + assertThat(decisionA.decisionRequirementsId).isEqualTo("Ratings") + assertThat(decisionA.version).isEqualTo(1) + assertThat(decisionA.decisionRequirementsKey).isPositive() + + val decisionB = decisionRepository.findAll().first { it.decisionId == "decision_b" } + assertThat(decisionB).isNotNull + assertThat(decisionB.key).isPositive() + assertThat(decisionB.decisionId).isEqualTo("decision_b") + assertThat(decisionB.decisionName).isEqualTo("Decision B") + assertThat(decisionB.decisionRequirementsId).isEqualTo("Ratings") + assertThat(decisionB.version).isEqualTo(1) + assertThat(decisionB.decisionRequirementsKey).isPositive() + } + + @Test + fun `should import decision requirements`() { + // when + zeebeClient.newDeployResourceCommand() + .addResourceFromClasspath("rating.dmn") + .send() + .join() + + // then + await.untilAsserted { assertThat(decisionRequirementsRepository.findAll()).hasSize(1) } + + val decisionRequirements = decisionRequirementsRepository.findAll().first() + assertThat(decisionRequirements.key).isPositive() + assertThat(decisionRequirements.decisionRequirementsId).isEqualTo("Ratings") + assertThat(decisionRequirements.decisionRequirementsName).isEqualTo("DRD") + assertThat(decisionRequirements.namespace).isEqualTo("http://camunda.org/schema/1.0/dmn") + assertThat(decisionRequirements.version).isEqualTo(1) + assertThat(decisionRequirements.deployTime).isPositive() + assertThat(decisionRequirements.resourceName).isEqualTo("rating.dmn") + assertThat(decisionRequirements.dmnXML).isNotEmpty() + assertThat(decisionRequirements.checksum).isNotEmpty() + } + + @SpringBootApplication + class TestConfiguration +} + diff --git a/hazelcast-importer/src/test/resources/rating.dmn b/hazelcast-importer/src/test/resources/rating.dmn new file mode 100644 index 00000000..d35cc481 --- /dev/null +++ b/hazelcast-importer/src/test/resources/rating.dmn @@ -0,0 +1,89 @@ + + + + + + + + + + decision_b + + + + + + "high" + + + "A++" + + + + + "mid" + + + "A+" + + + + + "low" + + + "A" + + + + + + + + + x + + + + + + > 10 + + + "high" + + + + + > 5 + + + "mid" + + + + + + + + "low" + + + + + + + + + + + + + + + + + + + +