Skip to content

Commit

Permalink
Merge pull request #440 from mswatosh/flow
Browse files Browse the repository at this point in the history
TCK updates for Flow
  • Loading branch information
mswatosh authored Feb 28, 2024
2 parents d71ae5b + 10d6ce5 commit 3780ecc
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,4 +224,24 @@ public void testCopyWithContextCapture() throws Throwable {
.withTestName(testname);
runTest(requestURL);
}

@Assertion(id = "GIT:368",
strategy = "A ContextService contextualizes a Flow.Subscriber, which is subscribed to an unmanaged Flow.Producer."
+ "The Flow.Subscriber methods are run with the thread context of the thread which contextualizes the Flow.Subscriber"
+ " per the configuration of the ContextServiceDefinition.")
public void testContextualFlowSubscriber() throws Throwable {
URLBuilder requestURL = URLBuilder.get().withBaseURL(contextURL).withPaths("ContextServiceDefinitionServlet")
.withTestName(testname);
runTest(requestURL);
}

@Assertion(id = "GIT:368",
strategy = "A ContextService contextualizes a Flow.Processor, which is subscribed to an unmanaged Flow.Producer."
+ "The Flow.Processor methods are run with the thread context of the thread which contextualizes the Flow.Processor"
+ " per the configuration of the ContextServiceDefinition.")
public void testContextualFlowProcessor() throws Throwable {
URLBuilder requestURL = URLBuilder.get().withBaseURL(contextURL).withPaths("ContextServiceDefinitionServlet")
.withTestName(testname);
runTest(requestURL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,24 @@ public void testCopyWithContextCapture() throws Throwable {
.withTestName(testname);
runTest(requestURL);
}

@Assertion(id = "GIT:368",
strategy = "A ContextService contextualizes a Flow.Subscriber, which is subscribed to an unmanaged Flow.Producer."
+ "The Flow.Subscriber methods are run with the thread context of the thread which contextualizes the Flow.Subscriber"
+ " per the configuration of the ContextServiceDefinition.")
public void testContextualFlowSubscriber() throws Throwable {
URLBuilder requestURL = URLBuilder.get().withBaseURL(contextURL).withPaths("ContextServiceDefinitionServlet")
.withTestName(testname);
runTest(requestURL);
}

@Assertion(id = "GIT:368",
strategy = "A ContextService contextualizes a Flow.Processor, which is subscribed to an unmanaged Flow.Producer."
+ "The Flow.Processor methods are run with the thread context of the thread which contextualizes the Flow.Processor"
+ " per the configuration of the ContextServiceDefinition.")
public void testContextualFlowProcessor() throws Throwable {
URLBuilder requestURL = URLBuilder.get().withBaseURL(contextURL).withPaths("ContextServiceDefinitionServlet")
.withTestName(testname);
runTest(requestURL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Processor;
import java.util.concurrent.Flow.Subscription;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -450,6 +455,183 @@ public void testContextualSupplier() throws Throwable {
}
}

/**
* A ContextService contextualizes a Flow.Subscriber, which is subscribed to an unmanaged Flow.Producer.
* The Flow.Subscriber methods are run with the thread context of the thread which contextualizes the Flow.Subscriber
* per the configuration of the ContextServiceDefinition.
*
* publisher.subscribe triggers onSubscribe
* publisher.offer triggers onNext
* publisher.close (implicit from AutoClosable) triggers onComplete
* publisher.closeExceptionally triggers onError
*/
public void testContextualFlowSubscriber() throws Throwable {
ContextService contextService = InitialContext.doLookup("java:app/concurrent/ContextA");
CountDownLatch publisherClosed = new CountDownLatch(1);

Subscriber<String> subscriber = new Subscriber<String>() {

private Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
subscription.request(1);
}

@Override
public void onNext(final String item) {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
subscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
publisherClosed.countDown();
}

@Override
public void onComplete() {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Subscriber "
+ "per java:app/concurrent/ContextA configuration.");
publisherClosed.countDown();
}

};


try (SubmissionPublisher<String> publisher =
new SubmissionPublisher<String>(unmanagedThreads, 2, null)) {
StringContext.set("testContextualFlowSubscriber-1");
IntContext.set(61);

publisher.subscribe(contextService.contextualSubscriber(subscriber));

publisher.offer("", MAX_WAIT_SECONDS, TimeUnit.SECONDS, null);
} finally {
StringContext.set(null);
IntContext.set(0);
}


try (SubmissionPublisher<String> publisher =
new SubmissionPublisher<String>(unmanagedThreads, 2, null)) {

StringContext.set("testContextualFlowSubscriber-1");
IntContext.set(61);

publisher.subscribe(contextService.contextualSubscriber(subscriber));

publisher.closeExceptionally(new Exception("Test onError"));

} finally {
StringContext.set(null);
IntContext.set(0);
}

assertTrue(publisherClosed.await(MAX_WAIT_SECONDS, TimeUnit.SECONDS));
}

/**
* A ContextService contextualizes a Flow.Processor, which is subscribed to an unmanaged Flow.Producer.
* The Flow.Subscriber methods are run with the thread context of the thread which contextualizes the Flow.Processor
* per the configuration of the ContextServiceDefinition.
*
* processor.subscribe triggers subscribe
* publisher.subscribe triggers onSubscribe
* publisher.offer triggers onNext
* publisher.close (implicit from AutoClosable) triggers onComplete
* publisher.closeExceptionally triggers onError
*/
public void testContextualFlowProcessor() throws Throwable {
ContextService contextService = InitialContext.doLookup("java:app/concurrent/ContextA");
CountDownLatch publisherClosed = new CountDownLatch(2);

Subscriber<String> emptySubscriber = new Subscriber<String>() {

private Subscription subscription;

@Override
public void onSubscribe(final Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}

@Override
public void onNext(final String item) {
subscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
publisherClosed.countDown();
}

@Override
public void onComplete() {
publisherClosed.countDown();
}

};

try (SubmissionPublisher<String> publisher =
new SubmissionPublisher<String>(unmanagedThreads, 2, null)) {
StringContext.set("testContextualFlowSubscriber-1");
IntContext.set(61);

Processor<String, String> processor = contextService.contextualProcessor(new ProcessorImpl());

processor.subscribe(emptySubscriber);
publisher.subscribe(processor);

publisher.offer("", MAX_WAIT_SECONDS, TimeUnit.SECONDS, null);

} finally {
StringContext.set(null);
IntContext.set(0);
}

try (SubmissionPublisher<String> publisher =
new SubmissionPublisher<String>(unmanagedThreads, 2, null)) {
StringContext.set("testContextualFlowSubscriber-1");
IntContext.set(61);

Processor<String, String> processor = contextService.contextualProcessor(new ProcessorImpl());

processor.subscribe(emptySubscriber);
publisher.subscribe(processor);

publisher.closeExceptionally(new Exception("Test onError"));

} finally {
StringContext.set(null);
IntContext.set(0);
}

assertTrue(publisherClosed.await(MAX_WAIT_SECONDS, TimeUnit.SECONDS));
}

/**
* ContextService can create a contextualized copy of an unmanaged
* CompletableFuture.
Expand Down Expand Up @@ -501,4 +683,73 @@ public void testCopyWithContextCapture() throws Throwable {
IntContext.set(0);
}
}

/**
* Uses the JDKs SubmissionPublisher to avoid writing a publisher implementation for the processor to
* extend. Overrides subscribe, the only Flow.Publisher api method inherited by Flow.Processor.
*/
class ProcessorImpl extends SubmissionPublisher<String> implements Processor<String, String> {

private Subscription processorSubscription;
private Subscriber<? super String> processorSubscriber;

@Override
public void onSubscribe(final Subscription subscription) {

processorSubscription = subscription;
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
processorSubscription.request(1);
}

@Override
public void onNext(final String item) {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
processorSubscription.request(1);
}

@Override
public void onError(final Throwable throwable) {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
processorSubscriber.onError(throwable);
}

@Override
public void onComplete() {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
processorSubscriber.onComplete();
}

@Override
public void subscribe(final Subscriber<? super String> subscriber) {
assertEquals(IntContext.get(), Integer.valueOf(61),
"Third-party context type IntContext must be propagated to async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
assertEquals(StringContext.get(), "",
"Third-party context type StringContext must be cleared from async contextual Flow.Processor "
+ "per java:app/concurrent/ContextA configuration.");
processorSubscriber = subscriber;

}

}
}

0 comments on commit 3780ecc

Please sign in to comment.