From 18b8893c6db9e45128193eea58cebaef8e4667f9 Mon Sep 17 00:00:00 2001 From: Tamas Cservenak Date: Sat, 20 Apr 2024 22:32:32 +0200 Subject: [PATCH] [MNG-8031] Backport concurrent friendly transport listener (#1471) Backport of simplex transfer listener. --- https://issues.apache.org/jira/browse/MNG-8031 --- .../java/org/apache/maven/cli/MavenCli.java | 6 +- .../ConsoleMavenTransferListener.java | 10 +- .../cli/transfer/SimplexTransferListener.java | 231 ++++++++++++++++++ .../transfer/SimplexTransferListenerTest.java | 86 +++++++ 4 files changed, 324 insertions(+), 9 deletions(-) create mode 100644 maven-embedder/src/main/java/org/apache/maven/cli/transfer/SimplexTransferListener.java create mode 100644 maven-embedder/src/test/java/org/apache/maven/cli/transfer/SimplexTransferListenerTest.java diff --git a/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java b/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java index 5932741a5106..5c92fca10cbe 100644 --- a/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java +++ b/maven-embedder/src/main/java/org/apache/maven/cli/MavenCli.java @@ -69,9 +69,7 @@ import org.apache.maven.cli.logging.Slf4jConfigurationFactory; import org.apache.maven.cli.logging.Slf4jLoggerManager; import org.apache.maven.cli.logging.Slf4jStdoutLogger; -import org.apache.maven.cli.transfer.ConsoleMavenTransferListener; -import org.apache.maven.cli.transfer.QuietMavenTransferListener; -import org.apache.maven.cli.transfer.Slf4jMavenTransferListener; +import org.apache.maven.cli.transfer.*; import org.apache.maven.eventspy.internal.EventSpyDispatcher; import org.apache.maven.exception.DefaultExceptionHandler; import org.apache.maven.exception.ExceptionHandler; @@ -1627,7 +1625,7 @@ static class IllegalUseOfUndefinedProperty extends IllegalArgumentException { // protected TransferListener getConsoleTransferListener(boolean printResourceNames) { - return new ConsoleMavenTransferListener(System.out, printResourceNames); + return new SimplexTransferListener(new ConsoleMavenTransferListener(System.out, printResourceNames)); } protected TransferListener getBatchTransferListener() { diff --git a/maven-embedder/src/main/java/org/apache/maven/cli/transfer/ConsoleMavenTransferListener.java b/maven-embedder/src/main/java/org/apache/maven/cli/transfer/ConsoleMavenTransferListener.java index 783db8117a2f..93144b63b407 100644 --- a/maven-embedder/src/main/java/org/apache/maven/cli/transfer/ConsoleMavenTransferListener.java +++ b/maven-embedder/src/main/java/org/apache/maven/cli/transfer/ConsoleMavenTransferListener.java @@ -49,21 +49,21 @@ public ConsoleMavenTransferListener(PrintStream out, boolean printResourceNames) } @Override - public synchronized void transferInitiated(TransferEvent event) { + public void transferInitiated(TransferEvent event) { overridePreviousTransfer(event); super.transferInitiated(event); } @Override - public synchronized void transferCorrupted(TransferEvent event) throws TransferCancelledException { + public void transferCorrupted(TransferEvent event) throws TransferCancelledException { overridePreviousTransfer(event); super.transferCorrupted(event); } @Override - public synchronized void transferProgressed(TransferEvent event) throws TransferCancelledException { + public void transferProgressed(TransferEvent event) throws TransferCancelledException { TransferResource resource = event.getResource(); transfers.put(resource, event.getTransferredBytes()); @@ -120,7 +120,7 @@ private void pad(StringBuilder buffer, int spaces) { } @Override - public synchronized void transferSucceeded(TransferEvent event) { + public void transferSucceeded(TransferEvent event) { transfers.remove(event.getResource()); overridePreviousTransfer(event); @@ -128,7 +128,7 @@ public synchronized void transferSucceeded(TransferEvent event) { } @Override - public synchronized void transferFailed(TransferEvent event) { + public void transferFailed(TransferEvent event) { transfers.remove(event.getResource()); overridePreviousTransfer(event); diff --git a/maven-embedder/src/main/java/org/apache/maven/cli/transfer/SimplexTransferListener.java b/maven-embedder/src/main/java/org/apache/maven/cli/transfer/SimplexTransferListener.java new file mode 100644 index 000000000000..c556e16b70a0 --- /dev/null +++ b/maven-embedder/src/main/java/org/apache/maven/cli/transfer/SimplexTransferListener.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.maven.cli.transfer; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +import org.eclipse.aether.transfer.AbstractTransferListener; +import org.eclipse.aether.transfer.TransferCancelledException; +import org.eclipse.aether.transfer.TransferEvent; +import org.eclipse.aether.transfer.TransferListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * A {@link TransferListener} implementation that wraps another delegate {@link TransferListener} but makes it run + * on single thread, keeping the listener logic simple. This listener also blocks on last transfer event to allow + * output to perform possible cleanup. It spawns a daemon thread to consume queued events that may fall in even + * concurrently. + * + * @since 3.9.7 + */ +public final class SimplexTransferListener extends AbstractTransferListener { + private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class); + private static final int QUEUE_SIZE = 1024; + private static final int BATCH_MAX_SIZE = 500; + private final TransferListener delegate; + private final int batchMaxSize; + private final boolean blockOnLastEvent; + private final ArrayBlockingQueue eventQueue; + + /** + * Constructor that makes passed in delegate run on single thread, and will block on last event. + */ + public SimplexTransferListener(TransferListener delegate) { + this(delegate, QUEUE_SIZE, BATCH_MAX_SIZE, true); + } + + /** + * Constructor that may alter behaviour of this listener. + * + * @param delegate The delegate that should run on single thread. + * @param queueSize The event queue size (default {@code 1024}). + * @param batchMaxSize The maximum batch size delegate should receive (default {@code 500}). + * @param blockOnLastEvent Should this listener block on last transfer end (completed or corrupted) block? (default {@code true}). + */ + public SimplexTransferListener( + TransferListener delegate, int queueSize, int batchMaxSize, boolean blockOnLastEvent) { + this.delegate = requireNonNull(delegate); + if (queueSize < 1 || batchMaxSize < 1) { + throw new IllegalArgumentException("Queue and batch sizes must be greater than 1"); + } + this.batchMaxSize = batchMaxSize; + this.blockOnLastEvent = blockOnLastEvent; + + this.eventQueue = new ArrayBlockingQueue<>(queueSize); + Thread updater = new Thread(this::feedConsumer); + updater.setDaemon(true); + updater.start(); + } + + public TransferListener getDelegate() { + return delegate; + } + + private void feedConsumer() { + final ArrayList batch = new ArrayList<>(batchMaxSize); + try { + while (true) { + batch.clear(); + if (eventQueue.drainTo(batch, BATCH_MAX_SIZE) == 0) { + batch.add(eventQueue.take()); + } + demux(batch); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void demux(List exchanges) { + for (Exchange exchange : exchanges) { + exchange.process(transferEvent -> { + TransferEvent.EventType type = transferEvent.getType(); + try { + switch (type) { + case INITIATED: + delegate.transferInitiated(transferEvent); + break; + case STARTED: + delegate.transferStarted(transferEvent); + break; + case PROGRESSED: + delegate.transferProgressed(transferEvent); + break; + case CORRUPTED: + delegate.transferCorrupted(transferEvent); + break; + case SUCCEEDED: + delegate.transferSucceeded(transferEvent); + break; + case FAILED: + delegate.transferFailed(transferEvent); + break; + default: + LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type); + } + } catch (TransferCancelledException e) { + ongoing.put(transferEvent.getResource().getFile(), Boolean.FALSE); + } + }); + } + } + + private void put(TransferEvent event, boolean last) { + try { + Exchange exchange; + if (blockOnLastEvent && last) { + exchange = new BlockingExchange(event); + } else { + exchange = new Exchange(event); + } + eventQueue.put(exchange); + exchange.waitForProcessed(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private final ConcurrentHashMap ongoing = new ConcurrentHashMap<>(); + + @Override + public void transferInitiated(TransferEvent event) { + ongoing.putIfAbsent(event.getResource().getFile(), Boolean.TRUE); + put(event, false); + } + + @Override + public void transferStarted(TransferEvent event) throws TransferCancelledException { + if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) { + throw new TransferCancelledException(); + } + put(event, false); + } + + @Override + public void transferProgressed(TransferEvent event) throws TransferCancelledException { + if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) { + throw new TransferCancelledException(); + } + put(event, false); + } + + @Override + public void transferCorrupted(TransferEvent event) throws TransferCancelledException { + if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) { + throw new TransferCancelledException(); + } + put(event, false); + } + + @Override + public void transferSucceeded(TransferEvent event) { + ongoing.remove(event.getResource().getFile()); + put(event, ongoing.isEmpty()); + } + + @Override + public void transferFailed(TransferEvent event) { + ongoing.remove(event.getResource().getFile()); + put(event, ongoing.isEmpty()); + } + + private static class Exchange { + private final TransferEvent event; + + private Exchange(TransferEvent event) { + this.event = event; + } + + public void process(Consumer consumer) { + consumer.accept(event); + } + + public void waitForProcessed() throws InterruptedException { + // nothing, is async + } + } + + private static class BlockingExchange extends Exchange { + private final CountDownLatch latch = new CountDownLatch(1); + + private BlockingExchange(TransferEvent event) { + super(event); + } + + @Override + public void process(Consumer consumer) { + super.process(consumer); + latch.countDown(); + } + + @Override + public void waitForProcessed() throws InterruptedException { + latch.await(); + } + } +} diff --git a/maven-embedder/src/test/java/org/apache/maven/cli/transfer/SimplexTransferListenerTest.java b/maven-embedder/src/test/java/org/apache/maven/cli/transfer/SimplexTransferListenerTest.java new file mode 100644 index 000000000000..382c6092de0d --- /dev/null +++ b/maven-embedder/src/test/java/org/apache/maven/cli/transfer/SimplexTransferListenerTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.maven.cli.transfer; + +import java.io.File; + +import org.eclipse.aether.DefaultRepositorySystemSession; +import org.eclipse.aether.transfer.TransferCancelledException; +import org.eclipse.aether.transfer.TransferEvent; +import org.eclipse.aether.transfer.TransferListener; +import org.eclipse.aether.transfer.TransferResource; +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class SimplexTransferListenerTest { + @Test + public void cancellation() throws InterruptedException { + TransferListener delegate = new TransferListener() { + @Override + public void transferInitiated(TransferEvent event) throws TransferCancelledException { + throw new TransferCancelledException(); + } + + @Override + public void transferStarted(TransferEvent event) throws TransferCancelledException { + throw new TransferCancelledException(); + } + + @Override + public void transferProgressed(TransferEvent event) throws TransferCancelledException { + throw new TransferCancelledException(); + } + + @Override + public void transferCorrupted(TransferEvent event) throws TransferCancelledException { + throw new TransferCancelledException(); + } + + @Override + public void transferSucceeded(TransferEvent event) {} + + @Override + public void transferFailed(TransferEvent event) {} + }; + + SimplexTransferListener listener = new SimplexTransferListener(delegate); + + TransferResource resource = + new TransferResource(null, null, "http://maven.org/test/test-resource", new File("file"), null); + DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(); + + // for technical reasons we cannot throw here, even if delegate does cancel transfer + listener.transferInitiated(new TransferEvent.Builder(session, resource) + .setType(TransferEvent.EventType.INITIATED) + .build()); + + Thread.sleep(500); // to make sure queue is processed, cancellation applied + + // subsequent call will cancel + try { + listener.transferStarted(new TransferEvent.Builder(session, resource) + .resetType(TransferEvent.EventType.STARTED) + .build()); + fail("should throw"); + } catch (TransferCancelledException e) { + // good + } + } +}