Skip to content

Commit

Permalink
Merge branch 'master' into master-config-se
Browse files Browse the repository at this point in the history
  • Loading branch information
barchetta authored Sep 10, 2019
2 parents 3338f4f + 804e5a5 commit 6401f8b
Show file tree
Hide file tree
Showing 192 changed files with 12,223 additions and 1,898 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
<requiredProperty key="artifactId"/>
<requiredProperty key="version"/>
<requiredProperty key="package"/>
<requiredProperty key="mainClass">
<defaultValue>no</defaultValue>
</requiredProperty>
<requiredProperty key="applicationName">
<defaultValue>ExampleApplication</defaultValue>
<validationRegex>([$_a-zA-Z][$_a-zA-Z0-9]*)</validationRegex>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,15 @@ public static Optional<ThreadPool> asThreadPool(ExecutorService executor) {
* @param maxPoolSize The maximum number of threads to allow in the pool
* @param growthThreshold The queue size above which pool growth should be considered if the pool is not fixed size.
* @param growthRate The percentage of task submissions that should result in adding threads, expressed as a value
* from 0 to 100. A rate of 0 means that the pool will never grow; for all other values the rate applies only when
* all of the following are true:
* from 0 to 100. For non-zero values the rate is applied when all of the following are true:
* <ul>
* <li>the pool size is below the maximum, and</li>
* <li>there are no idle threads, and</li>
* <li>the number of tasks in the queue exceeds the {@code growthThreshold}</li>
* </ul>
* For example, a rate of 20 means that while these conditions are met one thread will be added for every 5 submitted tasks.
* <p></p>For example, a rate of 20 means that while these conditions are met one thread will be added for every 5 submitted tasks.
* <p>A rate of 0 selects the default {@link ThreadPoolExecutor} growth behavior: a thread is added only when a submitted
* task is rejected because the queue is full.
* @param keepAliveTime When the number of threads is greater than the core, this is the maximum time that excess idle
* threads will wait for new tasks before terminating.
* @param keepAliveTimeUnits The units for {@code keepAliveTime}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,8 @@ public Builder prestart(boolean prestart) {
/**
* Load all properties for this thread pool from configuration.
* <p>
* <table>
* <caption style="text-align: left; font-weight: bold; font-size:16px; margin-bottom: 12px;">
* Optional Configuration Parameters</caption>
* <table class="config">
* <caption>Optional Configuration Parameters</caption>
* <tr>
* <th>key</th>
* <th>default value</th>
Expand Down Expand Up @@ -336,9 +335,8 @@ public Builder prestart(boolean prestart) {
* </tr>
* </table>
* <p>
* <table>
* <caption style="text-align: left; font-weight: bold; font-size:16px; margin-bottom: 12px;">Experimental Configuration
* Parameters (<em>subject to change</em>)</caption>
* <table class="config">
* <caption>Experimental Configuration Parameters (<em>subject to change</em>)</caption>
* <tr>
* <th>key</th>
* <th>default value</th>
Expand All @@ -350,17 +348,19 @@ public Builder prestart(boolean prestart) {
* <td>The queue size above which pool growth will be considered if the pool is not fixed size.</td>
* </tr>
* <tr>
* <td style="vertical-align: top;">growth-rate</td>
* <td style="vertical-align: top;">5</td>
* <td>The percentage of task submissions that should result in adding threads, expressed as a value from 1 to 100.
* This rate applies only when all of the following are true:
* <td>growth-rate</td>
* <td>5</td>
* <td>The percentage of task submissions that should result in adding a thread, expressed as a value from 0 to 100.
* For non-zero values the rate is applied when all of the following are true:
* <ul>
* <li>the pool size is below the maximum, and</li>
* <li>there are no idle threads, and</li>
* <li>the number of tasks in the queue exceeds the {@code growthThreshold}</li>
* </ul>
* For example, a rate of 20 means that while these conditions are met one thread will be added for every 5 submitted
* tasks.</td>
* <p>For example, a rate of 20 means that while these conditions are met one thread will be added for every 5 submitted
* tasks.
* <p>A rate of 0 selects the default {@link ThreadPoolExecutor} growth behavior: a thread is added only when a
* submitted task is rejected because the queue is full.</td>
* </tr>
* </table>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@
* limitations under the License.
*/
/**
* Support fo context propagation across executor boundaries.
* Support for context propagation across executor boundaries.
*/
package io.helidon.common.context;
44 changes: 41 additions & 3 deletions common/reactive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,46 @@
<spotbugs.exclude>etc/spotbugs/exclude.xml</spotbugs.exclude>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
<configuration>
<includes>
<include>**/*TckTest.java</include>
</includes>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-testng</artifactId>
<version>${version.lib.surefire.testng}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common-mapper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
Expand All @@ -48,9 +87,8 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.helidon.common</groupId>
<artifactId>helidon-common</artifactId>
<version>${project.version}</version>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import io.helidon.common.reactive.Flow.Processor;
import io.helidon.common.reactive.Flow.Publisher;
import io.helidon.common.reactive.Flow.Subscriber;
import io.helidon.common.reactive.Flow.Subscription;

/**
* A generic processor used for the implementation of {@link Multi} and {@link Single}.
*
* @param <T> subscribed type (input)
* @param <U> published type (output)
*/
abstract class BaseProcessor<T, U> implements Processor<T, U>, Subscription {

private Subscription subscription;
private final SingleSubscriberHolder<U> subscriber;
private final RequestedCounter requested;
private final AtomicBoolean ready;
private final AtomicBoolean subscribed;
private volatile boolean done;
private Throwable error;

BaseProcessor() {
requested = new RequestedCounter();
ready = new AtomicBoolean();
subscribed = new AtomicBoolean();
subscriber = new SingleSubscriberHolder<>();
}

@Override
public final void request(long n) {
requested.increment(n, ex -> onError(ex));
tryRequest(subscription);
if (done) {
tryComplete();
}
}

@Override
public final void cancel() {
subscriber.cancel();
}

@Override
public final void onSubscribe(Subscription s) {
if (subscription == null) {
this.subscription = s;
tryRequest(s);
}
}

@Override
public final void onNext(T item) {
if (!subscriber.isClosed()) {
try {
hookOnNext(item);
} catch (Throwable ex) {
onError(ex);
}
}
}

@Override
public final void onError(Throwable ex) {
done = true;
if (error == null) {
error = ex;
}
tryComplete();
}

@Override
public final void onComplete() {
done = true;
tryComplete();
}

@Override
public void subscribe(Subscriber<? super U> s) {
if (subscriber.register(s)) {
ready.set(true);
s.onSubscribe(this);
if (done) {
tryComplete();
}
}
}

/**
* Submit an item to the subscriber.
* @param item item to be submitted
*/
protected void submit(U item) {
if (requested.tryDecrement()) {
try {
subscriber.get().onNext(item);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
onError(ex);
} catch (ExecutionException ex) {
onError(ex);
} catch (Throwable ex) {
onError(ex);
}
} else {
onError(new IllegalStateException("Not enough request to submit item"));
}
}

/**
* Hook for {@link Subscriber#onNext(java.lang.Object)}.
* @param item next item
*/
protected void hookOnNext(T item) {
}

/**
* Hook for {@link Subscriber#onError(java.lang.Throwable)}.
* @param error error received
*/
protected void hookOnError(Throwable error) {
}

/**
* Hook for {@link Subscriber#onComplete()}.
*/
protected void hookOnComplete() {
}

/**
* Subscribe the subscriber after the given delegate publisher.
* @param delegate delegate publisher
*/
protected final void doSubscribe(Publisher<U> delegate) {
if (subscribed.compareAndSet(false, true)) {
delegate.subscribe(new Subscriber<U>() {
@Override
public void onSubscribe(Subscription subscription) {
tryRequest(subscription);
}

@Override
public void onNext(U item) {
submit(item);
}

@Override
public void onError(Throwable ex) {
BaseProcessor.this.onError(ex);
}

@Override
public void onComplete() {
BaseProcessor.this.onComplete();
}
});
}
}

private void completeOnError(Subscriber<? super U> sub, Throwable ex) {
hookOnError(ex);
sub.onError(ex);
}

private void tryComplete() {
if (ready.get() && !subscriber.isClosed()) {
if (error != null) {
subscriber.close(sub -> completeOnError(sub, error));
} else {
try {
hookOnComplete();
} catch (Throwable ex) {
subscriber.close(sub -> completeOnError(sub, ex));
return;
}
subscriber.close(Subscriber::onComplete);
}
}
}

private void tryRequest(Subscription s) {
if (s != null && !subscriber.isClosed()) {
long n = requested.get();
if (n > 0) {
s.request(n);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

/**
* A collector accumulates the items provided when {@link #collect(java.lang.Object)} is invoked and makes them available in a
* single container object with {@link #value()}.
*
* @param <T> collected items type (input)
* @param <U> result container type (output)
*/
public interface Collector<T, U> {

/**
* Collect the given item.
*
* @param item item to collect
*/
void collect(T item);

/**
* Get the collected items container.
*
* @return U
*/
U value();
}
Loading

0 comments on commit 6401f8b

Please sign in to comment.