Skip to content

Commit

Permalink
Dependencies graph + MergedErrorStreamId (#13)
Browse files Browse the repository at this point in the history
* First implementation of stream ids dependency tree
* Added InstrumentationService to fetch the dependencies on the pool
* Added MergedErrorStreamFactory and MergedErrorStreamId
* Minor code fixes and documentation

Co-authored-by: Anita Stanisz <astanisz1@gmail.com>
Co-authored-by: Aleksandra Mnich <olamnich@gmail.com>
  • Loading branch information
3 people authored Jun 26, 2019
1 parent 5c67ecd commit d187368
Show file tree
Hide file tree
Showing 23 changed files with 589 additions and 214 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
language: java
dist: trusty
jdk:
- oraclejdk8
- oraclejdk9
sudo: false
before_install:
- chmod +x ./gradlew
script:
Expand Down
295 changes: 148 additions & 147 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,148 +1,149 @@
buildscript {
project.ext.CERN_VM = System.getProperty('CERN_TECHNET_VM') ?: System.getenv('CERN_TECHNET_VM') ?: false
project.ext.TRAVIS_CI = System.getProperty('TRAVIS') ?: System.getenv('TRAVIS') ?: false
project.ext.DEPLOYMENT = System.getProperty('deployment') ?: false
project.ext.VCS_TAG = System.getProperty('TRAVIS_TAG') ?: System.getenv('TRAVIS_TAG')
project.ext.POM = [
groupId : 'org.streamingpool',
artifactId : 'streamingpool-core',
description: 'This project is an high level abstraction over Reactive Streams libraries that is currently used inside CERN.',
developers : [[
id : 'streamingpool-dev',
name : 'Streamingpool Developers',
email: 'streamingpool-dev@cern.ch'
]]
]
project.ext.INFO = [
repo : 'https://github.com/streamingpool/streamingpool-core.git',
url : 'http://www.streamingpool.org/',
github : 'https://github.com/streamingpool/streamingpool-core',
githubIssues: 'https://github.com/streamingpool/streamingpool-core/issues'
]
project.ext.BINTRAY = [
repo : 'streamingpool-repos',
name : 'org.streamingpool:streamingpool-core',
organization: 'streamingpool',
userName : 'streamingpool-dev',
apiToken : System.getenv('BINTRAY_API_TOKEN')
]
repositories {
if (CERN_VM) {
maven { url 'http://artifactory.cern.ch/gradle-plugins' }
maven { url 'http://artifactory.cern.ch/ds-jcenter' }
maven { url 'http://artifactory.cern.ch/development' }
} else {
mavenCentral()
jcenter()
}
}
dependencies {
classpath 'com.netflix.nebula:nebula-publishing-plugin:5.1.0'
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.8.4'
}
}

apply plugin: 'java'
apply plugin: 'jacoco'
apply plugin: 'idea'
apply plugin: 'eclipse'

repositories {
if (CERN_VM) {
maven { url 'http://artifactory.cern.ch/ds-jcenter' }
maven { url 'http://artifactory.cern.ch/development' }
} else {
mavenCentral()
jcenter()
}
}

if (DEPLOYMENT) {
apply plugin: 'maven'
apply plugin: 'maven-publish'
apply plugin: 'nebula.maven-publish'
apply plugin: 'com.jfrog.bintray'

println 'Applying deployment scripts'
apply from: './scripts/bintray-deploy.gradle'
}

group 'org.streamingpool'

sourceCompatibility = JavaVersion.VERSION_1_8

dependencies {
compile 'org.reactivestreams:reactive-streams:1.0.0'
compile 'io.reactivex.rxjava2:rxjava:2.1.16'

compile 'org.springframework:spring-core:5.1.2.RELEASE'
compile 'org.springframework:spring-context:5.1.2.RELEASE'
compile 'org.springframework:spring-test:5.1.2.RELEASE' // Testing utils in /src/java for the moment

compile 'org.slf4j:slf4j-api:1.7.25'

compile 'com.google.guava:guava:27.0.1-jre'

compile 'junit:junit:4.12' // Testing utils in /src/java for the moment
compile 'org.mockito:mockito-core:2.23.0' // Testing utils in /src/java for the moment
compile 'org.objenesis:objenesis:3.0.1'
testCompile 'org.assertj:assertj-core:3.8.+'
testCompile 'pl.pragmatists:JUnitParams:1.1.0'
testCompile 'com.openpojo:openpojo:0.8.10'
}

sourceSets {
main {
java {
srcDir 'src/java'
}
}
test {
java {
srcDir 'src/test'
}
}
}

javadoc { options.encoding = "UTF-8" }

task wrapper(type: Wrapper) { gradleVersion = '4.8.1' }

if(!project.tasks.findByName("javadocJar")) {
task javadocJar(type: Jar) {
classifier = 'javadoc'
from javadoc
}
}

if(!project.tasks.findByName("sourcesJar")) {
task sourcesJar(type: Jar) {
classifier = 'sources'
from sourceSets.main.allSource
}
}

artifacts { archives javadocJar, sourcesJar }

jacocoTestReport {
reports {
xml.enabled true
xml.destination new File("${buildDir}/reports/jacoco/report.xml")
html.enabled true
csv.enabled false
}
}

eclipse {
classpath {
downloadJavadoc = true
downloadSources = true
}
}

idea {
module {
downloadJavadoc = true
downloadSources = true
}
buildscript {
project.ext.CERN_VM = System.getProperty('CERN_TECHNET_VM') ?: System.getenv('CERN_TECHNET_VM') ?: false
project.ext.TRAVIS_CI = System.getProperty('TRAVIS') ?: System.getenv('TRAVIS') ?: false
project.ext.DEPLOYMENT = System.getProperty('deployment') ?: false
project.ext.VCS_TAG = System.getProperty('TRAVIS_TAG') ?: System.getenv('TRAVIS_TAG')
project.ext.POM = [
groupId : 'org.streamingpool',
artifactId : 'streamingpool-core',
description: 'This project is an high level abstraction over Reactive Streams libraries that is currently used inside CERN.',
developers : [[
id : 'streamingpool-dev',
name : 'Streamingpool Developers',
email: 'streamingpool-dev@cern.ch'
]]
]
project.ext.INFO = [
repo : 'https://github.com/streamingpool/streamingpool-core.git',
url : 'http://www.streamingpool.org/',
github : 'https://github.com/streamingpool/streamingpool-core',
githubIssues: 'https://github.com/streamingpool/streamingpool-core/issues'
]
project.ext.BINTRAY = [
repo : 'streamingpool-repos',
name : 'org.streamingpool:streamingpool-core',
organization: 'streamingpool',
userName : 'streamingpool-dev',
apiToken : System.getenv('BINTRAY_API_TOKEN')
]
repositories {
if (CERN_VM) {
maven { url 'http://artifactory.cern.ch/gradle-plugins' }
maven { url 'http://artifactory.cern.ch/ds-jcenter' }
maven { url 'http://artifactory.cern.ch/development' }
} else {
mavenCentral()
jcenter()
}
}
dependencies {
classpath 'com.netflix.nebula:nebula-publishing-plugin:5.1.0'
classpath 'com.jfrog.bintray.gradle:gradle-bintray-plugin:1.8.4'
}
}

apply plugin: 'java'
apply plugin: 'jacoco'
apply plugin: 'idea'
apply plugin: 'eclipse'

repositories {
if (CERN_VM) {
maven { url 'http://artifactory.cern.ch/ds-jcenter' }
maven { url 'http://artifactory.cern.ch/development' }
} else {
mavenCentral()
jcenter()
}
}

if (DEPLOYMENT) {
apply plugin: 'maven'
apply plugin: 'maven-publish'
apply plugin: 'nebula.maven-publish'
apply plugin: 'com.jfrog.bintray'

println 'Applying deployment scripts'
apply from: './scripts/bintray-deploy.gradle'
}

group 'org.streamingpool'

sourceCompatibility = JavaVersion.VERSION_1_8

dependencies {
compile 'org.reactivestreams:reactive-streams:1.0.0'
compile 'io.reactivex.rxjava2:rxjava:2.1.16'

compile 'org.springframework:spring-core:5.1.2.RELEASE'
compile 'org.springframework:spring-context:5.1.2.RELEASE'
compile 'org.springframework:spring-test:5.1.2.RELEASE' // Testing utils in /src/java for the moment

compile 'org.slf4j:slf4j-api:1.7.25'

compile 'com.google.guava:guava:27.0.1-jre'

compile 'junit:junit:4.12' // Testing utils in /src/java for the moment
compile 'org.mockito:mockito-core:2.23.0' // Testing utils in /src/java for the moment
compile 'org.objenesis:objenesis:3.0.1'
testCompile 'org.assertj:assertj-core:3.8.+'
testCompile 'pl.pragmatists:JUnitParams:1.1.0'
testCompile 'com.openpojo:openpojo:0.8.10'
testCompile 'org.slf4j:slf4j-simple:1.7.25'
}

sourceSets {
main {
java {
srcDir 'src/java'
}
}
test {
java {
srcDir 'src/test'
}
}
}

javadoc { options.encoding = "UTF-8" }

task wrapper(type: Wrapper) { gradleVersion = '4.8.1' }

if(!project.tasks.findByName("javadocJar")) {
task javadocJar(type: Jar) {
classifier = 'javadoc'
from javadoc
}
}

if(!project.tasks.findByName("sourcesJar")) {
task sourcesJar(type: Jar) {
classifier = 'sources'
from sourceSets.main.allSource
}
}

artifacts { archives javadocJar, sourcesJar }

jacocoTestReport {
reports {
xml.enabled true
xml.destination new File("${buildDir}/reports/jacoco/report.xml")
html.enabled true
csv.enabled false
}
}

eclipse {
classpath {
downloadJavadoc = true
downloadSources = true
}
}

idea {
module {
downloadJavadoc = true
downloadSources = true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.streamingpool.core.service.TypedStreamFactory;
import org.streamingpool.core.service.InstrumentationService;
import org.streamingpool.core.service.streamfactory.*;

/**
Expand All @@ -37,6 +38,11 @@
@Import({ StreamCreatorFactoryConfiguration.class })
public class DefaultStreamFactories {

@Bean
public MergedErrorStreamFactory mergedErrorStreamFactory(InstrumentationService instrumentationService) {
return new MergedErrorStreamFactory(instrumentationService);
}

@Bean
public CompositionStreamFactory compositionStreamFactory() {
return new CompositionStreamFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,12 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.streamingpool.core.service.InstrumentationService;
import org.streamingpool.core.service.impl.InstrumentationServiceImpl;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.TypedStreamFactory;
import org.streamingpool.core.service.impl.LocalPool;
import org.streamingpool.core.service.impl.PoolContent;

/**
* The spring configuration which shall be used in any application that will have the spring pool embedded. It provides
Expand All @@ -57,12 +60,19 @@ public class EmbeddedPoolConfiguration {
@Autowired(required = false)
private List<StreamFactory> streamFactories;

@Autowired
private PoolConfiguration poolConfiguration;
@Bean
public PoolContent poolContent() {
return new PoolContent();
}

@Bean
public InstrumentationService instrumentationService(PoolContent content) {
return new InstrumentationServiceImpl(content);
}

@Bean
public LocalPool pool() {
return new LocalPool(emptyIfNull(streamFactories), poolConfiguration);
public LocalPool pool(PoolConfiguration poolConfiguration, PoolContent content) {
return new LocalPool(emptyIfNull(streamFactories), poolConfiguration, content);
}

}
17 changes: 17 additions & 0 deletions src/java/org/streamingpool/core/domain/StreamDependencyTree.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.streamingpool.core.domain;

import org.streamingpool.core.service.StreamId;

import java.util.Set;

/**
* Data structure that holds the dependencies between {@link StreamId}s
*/
public interface StreamDependencyTree {

/**
* Get all the ancestors of the given {@link StreamId}. NOTE: this INCLUDES the source {@link StreamId}!
*/
Set<StreamId<?>> getAncestorsFrom(StreamId<?> source);

}
Loading

0 comments on commit d187368

Please sign in to comment.