Skip to content

Commit

Permalink
fix(importer): duplicate check for element instances and variables (#285
Browse files Browse the repository at this point in the history
)

* fix the duplicate check for element instances and variables
* use a combined primary key of the partition id and the position
* the position itself is not unique across multiple partitions
  • Loading branch information
nitram509 authored Aug 30, 2021
1 parent a246ecc commit 3123b08
Show file tree
Hide file tree
Showing 16 changed files with 418 additions and 40 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ The application imports the data from Zeebe using the [Hazelcast exporter](https

![how-it-works](docs/how-it-works.png)

## Install
## Upgrading from prior version

See [upgrade instructions](./UPGRADE.md)

## Fresh install

### Docker

Expand Down
36 changes: 36 additions & 0 deletions UPGRADE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

Upgrade documentation for Zeebe Simple Monitor
==================================================

## Upgrading from v2.0.0

Due to some issues with storing variables and element instances, the database structure changed.
Zeebe Simple Monitor will not alter existing database tables automatically, but if you have a PostgreSQL
or other DB running, you need to alter the table structures manually, in order to keep your data.

Of course, if you use an in-memory DB or do not need to keep prior data, then simply drop all tables and sequences,
and let Zeebe Simple Monitor create them again for you (automatic creation works).

### Upgrade procedure

1. stop Zeebe Simple Monitor (v2.0.0)
2. run the SQL script below against your PostgreSQL Database
3. start up Zeebe Simple Monitor (new version)

```sql
-- part 1, element_instance table changes
ALTER TABLE element_instance ADD COLUMN ID varchar(255);
UPDATE element_instance SET id = (partition_id_::varchar || '-' || position_::varchar) where true;
ALTER TABLE element_instance DROP CONSTRAINT element_instance_pkey;
ALTER TABLE element_instance ADD PRIMARY KEY (id);
CREATE INDEX element_instance_processInstanceKeyIndex ON element_instance (process_instance_key_);
-- part 2, variable table changes
ALTER TABLE variable ADD COLUMN ID varchar(255);
ALTER TABLE variable ADD COLUMN PARTITION_ID_ integer DEFAULT 1;
UPDATE variable SET id = (partition_id_::varchar || '-' || position_::varchar) where true;
ALTER TABLE variable DROP CONSTRAINT variable_pkey;
ALTER TABLE variable ADD PRIMARY KEY (id);
CREATE INDEX variable_processInstanceKeyIndex ON variable (process_instance_key_);
```
(This SQL was developed and tested using a recent PostgreSQL instance.
You might need to adopt that if you're using another Database)
29 changes: 26 additions & 3 deletions src/main/java/io/zeebe/monitor/entity/ElementInstanceEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@
*/
package io.zeebe.monitor.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.*;

@Entity(name = "ELEMENT_INSTANCE")
@Table(indexes = {
// performance reason, because we use it in the ElementInstanceRepository.findByProcessInstanceKey()
@Index(name = "element_instance_processInstanceKeyIndex", columnList = "PROCESS_INSTANCE_KEY_"),
})
public class ElementInstanceEntity {

@Id
@Column(name = "ID")
private String id;

@Column(name = "POSITION_")
private Long position;

Expand Down Expand Up @@ -53,6 +58,24 @@ public class ElementInstanceEntity {
@Column(name = "TIMESTAMP_")
private long timestamp;

public String getId() {
return id;
}

private void setId(final String id) {
// made private, to avoid accidental changes
this.id = id;
}

public final String getGeneratedIdentifier() {
return this.partitionId + "-" + this.position;
}

@PrePersist
private void prePersistDeriveIdField() {
setId(getGeneratedIdentifier());
}

public long getKey() {
return key;
}
Expand Down
41 changes: 37 additions & 4 deletions src/main/java/io/zeebe/monitor/entity/VariableEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,25 @@
*/
package io.zeebe.monitor.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.*;

@Entity(name = "VARIABLE")
@Table(indexes = {
// performance reason, because we use it in the VariableRepository.findByProcessInstanceKey()
@Index(name = "variable_processInstanceKeyIndex", columnList = "PROCESS_INSTANCE_KEY_"),
})
public class VariableEntity {

@Id
@Column(name = "ID")
private String id;

@Column(name = "POSITION_")
private Long position;

@Column(name = "PARTITION_ID_")
private int partitionId;

@Column(name = "NAME_")
private String name;

Expand All @@ -46,6 +53,24 @@ public class VariableEntity {
@Column(name = "TIMESTAMP_")
private long timestamp;

public String getId() {
return id;
}

private void setId(final String id) {
// made private, to avoid accidental changes
this.id = id;
}

public final String getGeneratedIdentifier() {
return this.partitionId + "-" + this.position;
}

@PrePersist
private void prePersistDeriveIdField(){
setId(getGeneratedIdentifier());
}

public String getState() {
return state;
}
Expand Down Expand Up @@ -101,4 +126,12 @@ public Long getPosition() {
public void setPosition(final Long position) {
this.position = position;
}

public int getPartitionId() {
return partitionId;
}

public void setPartitionId(final int partitionId) {
this.partitionId = partitionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.zeebe.monitor.entity.ElementInstanceEntity;
import org.springframework.data.repository.CrudRepository;

public interface ElementInstanceRepository extends CrudRepository<ElementInstanceEntity, Long> {
public interface ElementInstanceRepository extends CrudRepository<ElementInstanceEntity, String> {

Iterable<ElementInstanceEntity> findByProcessInstanceKey(long processInstanceKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import java.util.List;

public interface VariableRepository extends PagingAndSortingRepository<VariableEntity, Long> {
public interface VariableRepository extends PagingAndSortingRepository<VariableEntity, String> {

List<VariableEntity> findByProcessInstanceKey(long processInstanceKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
@Component
public class ProcessAndElementImporter {

@Autowired private ProcessRepository processRepository;
@Autowired private ProcessInstanceRepository processInstanceRepository;
@Autowired private ElementInstanceRepository elementInstanceRepository;
@Autowired
private ProcessRepository processRepository;
@Autowired
private ProcessInstanceRepository processInstanceRepository;
@Autowired
private ElementInstanceRepository elementInstanceRepository;

@Autowired private ZeebeNotificationService notificationService;
@Autowired
private ZeebeNotificationService notificationService;

public void importProcess(final Schema.ProcessRecord record) {
final int partitionId = record.getMetadata().getPartitionId();
Expand All @@ -44,7 +48,6 @@ public void importProcessInstance(final Schema.ProcessInstanceRecord record) {
if (record.getProcessInstanceKey() == record.getMetadata().getKey()) {
addOrUpdateProcessInstance(record);
}

addElementInstance(record);
}

Expand Down Expand Up @@ -97,13 +100,10 @@ private void addOrUpdateProcessInstance(final Schema.ProcessInstanceRecord recor
}

private void addElementInstance(final Schema.ProcessInstanceRecord record) {

final long position = record.getMetadata().getPosition();
if (!elementInstanceRepository.existsById(position)) {

final ElementInstanceEntity entity = new ElementInstanceEntity();
entity.setPosition(position);
entity.setPartitionId(record.getMetadata().getPartitionId());
final ElementInstanceEntity entity = new ElementInstanceEntity();
entity.setPartitionId(record.getMetadata().getPartitionId());
entity.setPosition(record.getMetadata().getPosition());
if (!elementInstanceRepository.existsById(entity.getGeneratedIdentifier())) {
entity.setKey(record.getMetadata().getKey());
entity.setIntent(record.getMetadata().getIntent());
entity.setTimestamp(record.getMetadata().getTimestamp());
Expand All @@ -112,11 +112,8 @@ private void addElementInstance(final Schema.ProcessInstanceRecord record) {
entity.setFlowScopeKey(record.getFlowScopeKey());
entity.setProcessDefinitionKey(record.getProcessDefinitionKey());
entity.setBpmnElementType(record.getBpmnElementType());

elementInstanceRepository.save(entity);

notificationService.sendProcessInstanceUpdated(
record.getProcessInstanceKey(), record.getProcessDefinitionKey());
notificationService.sendProcessInstanceUpdated(record.getProcessInstanceKey(), record.getProcessDefinitionKey());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@
@Component
public class VariableImporter {

@Autowired private VariableRepository variableRepository;
@Autowired
private VariableRepository variableRepository;

public void importVariable(final Schema.VariableRecord record) {

final long position = record.getMetadata().getPosition();
if (!variableRepository.existsById(position)) {

final VariableEntity entity = new VariableEntity();
entity.setPosition(position);
entity.setTimestamp(record.getMetadata().getTimestamp());
entity.setProcessInstanceKey(record.getProcessInstanceKey());
entity.setName(record.getName());
entity.setValue(record.getValue());
entity.setScopeKey(record.getScopeKey());
entity.setState(record.getMetadata().getIntent().toLowerCase());
variableRepository.save(entity);
final VariableEntity newVariable = new VariableEntity();
newVariable.setPosition(record.getMetadata().getPosition());
newVariable.setPartitionId(record.getMetadata().getPartitionId());
if (!variableRepository.existsById(newVariable.getGeneratedIdentifier())) {
newVariable.setTimestamp(record.getMetadata().getTimestamp());
newVariable.setProcessInstanceKey(record.getProcessInstanceKey());
newVariable.setName(record.getName());
newVariable.setValue(record.getValue());
newVariable.setScopeKey(record.getScopeKey());
newVariable.setState(record.getMetadata().getIntent().toLowerCase());
variableRepository.save(newVariable);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.zeebe.monitor.repository;

import io.zeebe.monitor.entity.ElementInstanceEntity;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

public class ElementInstanceRepositoryTest extends ZeebeRepositoryTest {

@Autowired
private ElementInstanceRepository elementInstanceRepository;

@Test
public void JPA_will_automatically_update_the_ID_attribute() {
// given
ElementInstanceEntity elementInstance = createElementInstance();

// when
elementInstanceRepository.save(elementInstance);

// then
assertThat(elementInstance.getId()).isEqualTo("123-456");
}

@Test
public void variable_can_be_retrieved_by_transient_ID() {
// given
ElementInstanceEntity elementInstance = createElementInstance();

// when
elementInstanceRepository.save(elementInstance);

// then
Optional<ElementInstanceEntity> entity = elementInstanceRepository.findById("123-456");
assertThat(entity).isPresent();
}

private ElementInstanceEntity createElementInstance() {
ElementInstanceEntity elementInstance = new ElementInstanceEntity();
elementInstance.setPartitionId(123);
elementInstance.setPosition(456L);
return elementInstance;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package io.zeebe.monitor.repository;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
@EnableJpaRepositories(basePackages = "io.zeebe.monitor.repository")
@EnableTransactionManagement
public class TestContextJpaConfiguration {

@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl("jdbc:h2:mem:zeebe-monitor-test;DB_CLOSE_DELAY=-1");
dataSource.setUsername("sa");
dataSource.setPassword("");
return dataSource;
}

@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory(@Autowired DataSource dataSource) {
LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
em.setDataSource(dataSource);
em.setPackagesToScan("io.zeebe.monitor.entity");

JpaVendorAdapter vendorAdapter = new HibernateJpaVendorAdapter();
em.setJpaVendorAdapter(vendorAdapter);
em.setJpaProperties(getAdditionalJpaProperties());

return em;
}

@Bean
public PlatformTransactionManager transactionManager(@Autowired LocalContainerEntityManagerFactoryBean entityManagerFactory) {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory.getObject());
return transactionManager;
}

private Properties getAdditionalJpaProperties() {
Properties p = new Properties();
p.setProperty("database-platform", "org.hibernate.dialect.H2Dialect");
p.setProperty("hibernate.hbm2ddl.auto", "update");
return p;
}

}
Loading

0 comments on commit 3123b08

Please sign in to comment.