Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
manan164 committed Aug 2, 2023
2 parents e6ef05f + d0a5883 commit 3ae3bc2
Show file tree
Hide file tree
Showing 54 changed files with 958 additions and 192 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ lib/
build/
*/build/

# asdf version file
.tool-versions

1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ Changes to configurations:
| workflow.default.event.processor.enabled | conductor.default-event-processor.enabled | true |
| workflow.events.default.queue.type | conductor.default-event-queue.type | sqs |
| workflow.status.listener.type | conductor.workflow-status-listener.type | stub |
| - | conductor.task-status-listener.type | stub |
| workflow.decider.locking.server | conductor.workflow-execution-lock.type | noop_lock |
| | | |
| workflow.default.event.queue.enabled | conductor.event-queues.default.enabled | true |
Expand Down
34 changes: 23 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
![Conductor](docs/docs/img/logo.png)

# Conductor
Conductor is a platform created by Netflix to orchestrate workflows that span across microservices.

[![Github release](https://img.shields.io/github/v/release/Netflix/conductor.svg)](https://GitHub.com/Netflix/conductor/releases)
[![CI](https://github.com/Netflix/conductor/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/Netflix/conductor/actions/workflows/ci.yml)
[![License](https://img.shields.io/github/license/Netflix/conductor.svg)](http://www.apache.org/licenses/LICENSE-2.0)
[![NetflixOSS Lifecycle](https://img.shields.io/osslifecycle/Netflix/conductor.svg)]()

## Workflow Creation in Code
Conductor supports creating workflows using JSON and Code.
SDK support for creating workflows using code is available in multiple languages and can be found at https://github.com/conductor-sdk
[![GitHub stars](https://img.shields.io/github/stars/Netflix/conductor.svg?style=social&label=Star&maxAge=2592000)](https://GitHub.com/Netflix/conductor/stargazers/)
[![GitHub forks](https://img.shields.io/github/forks/Netflix/conductor.svg?style=social&label=Fork&maxAge=2592000)](https://GitHub.com/Netflix/conductor/network/)


Conductor is a platform created by Netflix to orchestrate workflows that span across microservices.
Conductor is maintained by Media Workflow Infrastructure team at Netflix.

For more information, see [Main Documentation Site](https://conductor.netflix.com/)

## Documentation
[Main Documentation Site](https://conductor.netflix.com/)

## Releases
The latest version is [![Github release](https://img.shields.io/github/v/release/Netflix/conductor.svg)](https://GitHub.com/Netflix/conductor/releases)

[2.31.8](https://github.com/Netflix/conductor/releases/tag/v2.31.8) is the **final** release of `2.31` branch. As of Feb 2022, `1.x` & `2.x` versions are no longer supported.

## Resources
#### [Slack Community](https://join.slack.com/t/orkes-conductor/shared_invite/zt-xyxqyseb-YZ3hwwAgHJH97bsrYRnSZg)
We have an active [community](https://join.slack.com/t/orkes-conductor/shared_invite/zt-xyxqyseb-YZ3hwwAgHJH97bsrYRnSZg) of Conductor users and contributors on the channel.
#### [Documentation Site](https://conductor.netflix.com/)
[Documentation](https://conductor.netflix.com/) and tutorial on how to use Conductor

## Workflow Creation in Code
Conductor supports creating workflows using JSON and Code.
SDK support for creating workflows using code is available in multiple languages and can be found at https://github.com/conductor-sdk

## Community Contributions
The modules contributed by the community are housed at [conductor-community](https://github.com/Netflix/conductor-community). Compatible versions of the community modules are released simultaneously with releases of the main modules.

Expand All @@ -34,7 +45,6 @@ The easiest way to get started is with Docker containers. Please follow the inst
### From Source:
Conductor Server is a [Spring Boot](https://spring.io/projects/spring-boot) project and follows all applicable conventions. See instructions [here](http://conductor.netflix.com/gettingstarted/source.html).


## Published Artifacts
Binaries are available from [Netflix OSS Maven](https://artifacts.netflix.net/netflixoss/com/netflix/conductor/) repository, or the [Maven Central Repository](https://search.maven.org/search?q=g:com.netflix.conductor).

Expand All @@ -49,6 +59,7 @@ Binaries are available from [Netflix OSS Maven](https://artifacts.netflix.net/ne
| conductor-ui | node.js based UI for Conductor |
| conductor-client | Java client for Conductor that includes helpers for running worker tasks |
| conductor-client-spring | Client starter kit for Spring |
| conductor-java-sdk | SDK for writing workflows in code |
| conductor-server | Spring Boot Web Application |
| conductor-redis-lock | Workflow execution lock implementation using Redis |
| conductor-awss3-storage | External payload storage implementation using AWS S3 |
Expand All @@ -62,16 +73,17 @@ Binaries are available from [Netflix OSS Maven](https://artifacts.netflix.net/ne

## Database Requirements

* The default persistence used is [Dynomite](https://github.com/Netflix/dynomite)
* For queues, we are relying on [dyno-queues](https://github.com/Netflix/dyno-queues)
* The default persistence used is Redis
* The indexing backend is [Elasticsearch](https://www.elastic.co/) (6.x)

## Other Requirements
* JDK 11+
* UI requires Node 14 to build. Earlier Node versions may work but is untested.

## Get Support
Conductor is maintained by Media Workflow Infrastructure team at Netflix. Use Github issue tracking for filing issues and the [Discussion Forum](https://github.com/Netflix/conductor/discussions) for any other questions, ideas or support requests.
There are several ways to get in touch with us:
* [Slack Community](https://join.slack.com/t/orkes-conductor/shared_invite/zt-xyxqyseb-YZ3hwwAgHJH97bsrYRnSZg)
* [GitHub Discussion Forum](https://github.com/Netflix/conductor/discussions)

## Contributions
Whether it is a small documentation correction, bug fix or a new feature, contributions are highly appreciated. We just ask you to follow standard OSS guidelines. The [Discussion Forum](https://github.com/Netflix/conductor/discussions) is a good place to ask questions, discuss new features and explore ideas. Please check with us before spending too much time, only to find out later that someone else is already working on a similar feature.
Expand Down
10 changes: 6 additions & 4 deletions WHOSUSING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ We would like to keep track of whose using Conductor. Please send a pull request
* [Deutsche Telekom Digital Labs](https://dtdl.in) [[@jas34](https://github.com/jas34)] [[@deoramanas](https://github.com/deoramanas)]
* [VMware](www.vmware.com) [[@taojwmware](https://github.com/taojwmware)] [[@venkag](https://github.com/venkag)]
* [JP Morgan Chase](www.chase.com) [[@maheshyaddanapudi](https://github.com/maheshyaddanapudi)]
* [Orkes ](www.orkes.io)[[@CherishSantoshi](https://github.com/CherishSantoshi)]
* [313X](https://313x.com.br)[[@dalmoveras](https://github.com/dalmoveras)]
* [Supercharge](https://supercharge.io)[[@team-supercharge](https://github.com/team-supercharge)]
* [GE Healthcare](https://www.gehealthcare.com/) [[@flavioschuindt](https://github.com/flavioschuindt)]
* [Orkes ](www.orkes.io)[[@CherishSantoshi](https://github.com/CherishSantoshi)]
* [313X](https://313x.com.br)[[@dalmoveras](https://github.com/dalmoveras)]
* [Supercharge](https://supercharge.io)[[@team-supercharge](https://github.com/team-supercharge)]
* [GE Healthcare](https://www.gehealthcare.com/) [[@flavioschuindt](https://github.com/flavioschuindt)]
* [ReliaQuest](https://www.reliaquest.com/) [[@rq-dbrady](https://github.com/rq-dbrady)] [[@alexmay48](https://github.com/alexmay48)]
* [Clari](https://www.clari.com/) [[@TeamJOF](https://github.com/clari)]
4 changes: 2 additions & 2 deletions awss3-storage/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
Expand Down Expand Up @@ -273,7 +273,7 @@
"com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,26 @@
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.s3.storage.S3PayloadStorage;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

@Configuration
@EnableConfigurationProperties(S3Properties.class)
@ConditionalOnProperty(name = "conductor.external-payload-storage.type", havingValue = "s3")
public class S3Configuration {

@Bean
public ExternalPayloadStorage s3ExternalPayloadStorage(
IDGenerator idGenerator, S3Properties properties) {
return new S3PayloadStorage(idGenerator, properties);
IDGenerator idGenerator, S3Properties properties, AmazonS3 s3Client) {
return new S3PayloadStorage(idGenerator, properties, s3Client);
}

@ConditionalOnProperty(
name = "conductor.external-payload-storage.s3.use_default_client",
havingValue = "true",
matchIfMissing = true)
@Bean
public AmazonS3 amazonS3(S3Properties properties) {
return AmazonS3ClientBuilder.standard().withRegion(properties.getRegion()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.amazonaws.HttpMethod;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.*;

/**
Expand All @@ -52,12 +51,11 @@ public class S3PayloadStorage implements ExternalPayloadStorage {
private final String bucketName;
private final long expirationSec;

public S3PayloadStorage(IDGenerator idGenerator, S3Properties properties) {
public S3PayloadStorage(IDGenerator idGenerator, S3Properties properties, AmazonS3 s3Client) {
this.idGenerator = idGenerator;
this.s3Client = s3Client;
bucketName = properties.getBucketName();
expirationSec = properties.getSignedUrlExpirationDuration().getSeconds();
String region = properties.getRegion();
s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build();
}

/**
Expand Down
4 changes: 2 additions & 2 deletions awssqs-event-queue/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
Expand Down Expand Up @@ -294,7 +294,7 @@
"com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
Expand Down
6 changes: 3 additions & 3 deletions cassandra-persistence/dependencies.lock
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
Expand Down Expand Up @@ -196,7 +196,7 @@
"locked": "3.10.2"
},
"com.google.protobuf:protobuf-java": {
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.netflix.conductor:conductor-common": {
"project": true
Expand Down Expand Up @@ -291,7 +291,7 @@
"com.netflix.conductor:conductor-common",
"com.netflix.conductor:conductor-core"
],
"locked": "3.13.0"
"locked": "3.21.7"
},
"com.jayway.jsonpath:json-path": {
"firstLevelTransitive": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public List<WorkflowDef> getAllWorkflowDefs() {
return cassandraMetadataDAO.getAllWorkflowDefs();
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
return cassandraMetadataDAO.getAllWorkflowDefsLatestVersions();
}

private List<TaskDef> refreshTaskDefsCache() {
try {
Cache taskDefsCache = cacheManager.getCache(TASK_DEF_CACHE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -60,8 +63,10 @@ public class CassandraMetadataDAO extends CassandraBaseDAO implements MetadataDA
private final PreparedStatement insertTaskDefStatement;

private final PreparedStatement selectWorkflowDefStatement;

private final PreparedStatement selectAllWorkflowDefVersionsByNameStatement;
private final PreparedStatement selectAllWorkflowDefsStatement;
private final PreparedStatement selectAllWorkflowDefsLatestVersionsStatement;
private final PreparedStatement selectTaskDefStatement;
private final PreparedStatement selectAllTaskDefsStatement;

Expand Down Expand Up @@ -97,6 +102,9 @@ public CassandraMetadataDAO(
this.selectAllWorkflowDefsStatement =
session.prepare(statements.getSelectAllWorkflowDefsStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
this.selectAllWorkflowDefsLatestVersionsStatement =
session.prepare(statements.getSelectAllWorkflowDefsLatestVersionsStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
this.selectTaskDefStatement =
session.prepare(statements.getSelectTaskDefStatement())
.setConsistencyLevel(properties.getReadConsistencyLevel());
Expand Down Expand Up @@ -289,6 +297,48 @@ public List<WorkflowDef> getAllWorkflowDefs() {
}
}

@Override
public List<WorkflowDef> getAllWorkflowDefsLatestVersions() {
try {
ResultSet resultSet =
session.execute(
selectAllWorkflowDefsLatestVersionsStatement.bind(
WORKFLOW_DEF_INDEX_KEY));
List<Row> rows = resultSet.all();
if (rows.size() == 0) {
LOGGER.info("No workflow definitions were found.");
return Collections.EMPTY_LIST;
}
Map<String, PriorityQueue<WorkflowDef>> allWorkflowDefs = new HashMap<>();

for (Row row : rows) {
String defNameVersion = row.getString(WORKFLOW_DEF_NAME_VERSION_KEY);
var nameVersion = getWorkflowNameAndVersion(defNameVersion);
WorkflowDef def =
getWorkflowDef(nameVersion.getLeft(), nameVersion.getRight()).orElse(null);
if (def == null) {
continue;
}
if (allWorkflowDefs.get(def.getName()) == null) {
allWorkflowDefs.put(
def.getName(),
new PriorityQueue<>(
(WorkflowDef w1, WorkflowDef w2) ->
Integer.compare(w2.getVersion(), w1.getVersion())));
}
allWorkflowDefs.get(def.getName()).add(def);
}
return allWorkflowDefs.values().stream()
.map(PriorityQueue::poll)
.collect(Collectors.toList());
} catch (DriverException e) {
Monitors.error(CLASS_NAME, "getAllWorkflowDefsLatestVersions");
String errorMsg = "Error retrieving all workflow defs latest versions";
LOGGER.error(errorMsg, e);
throw new TransientException(errorMsg, e);
}
}

private TaskDef getTaskDefFromDB(String name) {
try {
ResultSet resultSet = session.execute(selectTaskDefStatement.bind(name));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ public String getSelectAllWorkflowDefsStatement() {
.getQueryString();
}

public String getSelectAllWorkflowDefsLatestVersionsStatement() {
return QueryBuilder.select()
.all()
.from(keyspace, TABLE_WORKFLOW_DEFS_INDEX)
.where(eq(WORKFLOW_DEF_INDEX_KEY, bindMarker()))
.getQueryString();
}

/**
* @return cql query statement to fetch a task definition by name from the "task_definitions"
* table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,36 @@ class CassandraMetadataDAOSpec extends CassandraSpec {

}

def "Get All WorkflowDef"() {
when:
metadataDAO.removeWorkflowDef("workflow_def_1", 1)
WorkflowDef workflowDef = new WorkflowDef()
workflowDef.setName("workflow_def_1")
workflowDef.setVersion(1)
workflowDef.setOwnerEmail("test@junit.com")
metadataDAO.createWorkflowDef(workflowDef)

workflowDef.setName("workflow_def_2")
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(2)
metadataDAO.createWorkflowDef(workflowDef)

workflowDef.setName("workflow_def_3")
workflowDef.setVersion(1)
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(2)
metadataDAO.createWorkflowDef(workflowDef)
workflowDef.setVersion(3)
metadataDAO.createWorkflowDef(workflowDef)

then: // fetch the workflow definition
def allDefsLatestVersions = metadataDAO.getAllWorkflowDefsLatestVersions()
Map<String, WorkflowDef> allDefsMap = allDefsLatestVersions.collectEntries {wfDef -> [wfDef.getName(), wfDef]}
allDefsMap.get("workflow_def_1").getVersion() == 1
allDefsMap.get("workflow_def_2").getVersion() == 2
allDefsMap.get("workflow_def_3").getVersion() == 3
}

def "parse index string"() {
expect:
def pair = metadataDAO.getWorkflowNameAndVersion(nameVersionStr)
Expand Down
Loading

0 comments on commit 3ae3bc2

Please sign in to comment.