Skip to content

Commit

Permalink
[MNG-8031] Backport concurrent friendly transport listener (#1471)
Browse files Browse the repository at this point in the history
Backport of simplex transfer listener.

---

https://issues.apache.org/jira/browse/MNG-8031
  • Loading branch information
cstamas committed Apr 20, 2024
1 parent 16f8127 commit 18b8893
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@
import org.apache.maven.cli.logging.Slf4jConfigurationFactory;
import org.apache.maven.cli.logging.Slf4jLoggerManager;
import org.apache.maven.cli.logging.Slf4jStdoutLogger;
import org.apache.maven.cli.transfer.ConsoleMavenTransferListener;
import org.apache.maven.cli.transfer.QuietMavenTransferListener;
import org.apache.maven.cli.transfer.Slf4jMavenTransferListener;
import org.apache.maven.cli.transfer.*;
import org.apache.maven.eventspy.internal.EventSpyDispatcher;
import org.apache.maven.exception.DefaultExceptionHandler;
import org.apache.maven.exception.ExceptionHandler;
Expand Down Expand Up @@ -1627,7 +1625,7 @@ static class IllegalUseOfUndefinedProperty extends IllegalArgumentException {
//

protected TransferListener getConsoleTransferListener(boolean printResourceNames) {
return new ConsoleMavenTransferListener(System.out, printResourceNames);
return new SimplexTransferListener(new ConsoleMavenTransferListener(System.out, printResourceNames));
}

protected TransferListener getBatchTransferListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,21 @@ public ConsoleMavenTransferListener(PrintStream out, boolean printResourceNames)
}

@Override
public synchronized void transferInitiated(TransferEvent event) {
public void transferInitiated(TransferEvent event) {
overridePreviousTransfer(event);

super.transferInitiated(event);
}

@Override
public synchronized void transferCorrupted(TransferEvent event) throws TransferCancelledException {
public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
overridePreviousTransfer(event);

super.transferCorrupted(event);
}

@Override
public synchronized void transferProgressed(TransferEvent event) throws TransferCancelledException {
public void transferProgressed(TransferEvent event) throws TransferCancelledException {
TransferResource resource = event.getResource();
transfers.put(resource, event.getTransferredBytes());

Expand Down Expand Up @@ -120,15 +120,15 @@ private void pad(StringBuilder buffer, int spaces) {
}

@Override
public synchronized void transferSucceeded(TransferEvent event) {
public void transferSucceeded(TransferEvent event) {
transfers.remove(event.getResource());
overridePreviousTransfer(event);

super.transferSucceeded(event);
}

@Override
public synchronized void transferFailed(TransferEvent event) {
public void transferFailed(TransferEvent event) {
transfers.remove(event.getResource());
overridePreviousTransfer(event);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.cli.transfer;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

import org.eclipse.aether.transfer.AbstractTransferListener;
import org.eclipse.aether.transfer.TransferCancelledException;
import org.eclipse.aether.transfer.TransferEvent;
import org.eclipse.aether.transfer.TransferListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

/**
* A {@link TransferListener} implementation that wraps another delegate {@link TransferListener} but makes it run
* on single thread, keeping the listener logic simple. This listener also blocks on last transfer event to allow
* output to perform possible cleanup. It spawns a daemon thread to consume queued events that may fall in even
* concurrently.
*
* @since 3.9.7
*/
public final class SimplexTransferListener extends AbstractTransferListener {
private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class);
private static final int QUEUE_SIZE = 1024;
private static final int BATCH_MAX_SIZE = 500;
private final TransferListener delegate;
private final int batchMaxSize;
private final boolean blockOnLastEvent;
private final ArrayBlockingQueue<Exchange> eventQueue;

/**
* Constructor that makes passed in delegate run on single thread, and will block on last event.
*/
public SimplexTransferListener(TransferListener delegate) {
this(delegate, QUEUE_SIZE, BATCH_MAX_SIZE, true);
}

/**
* Constructor that may alter behaviour of this listener.
*
* @param delegate The delegate that should run on single thread.
* @param queueSize The event queue size (default {@code 1024}).
* @param batchMaxSize The maximum batch size delegate should receive (default {@code 500}).
* @param blockOnLastEvent Should this listener block on last transfer end (completed or corrupted) block? (default {@code true}).
*/
public SimplexTransferListener(
TransferListener delegate, int queueSize, int batchMaxSize, boolean blockOnLastEvent) {
this.delegate = requireNonNull(delegate);
if (queueSize < 1 || batchMaxSize < 1) {
throw new IllegalArgumentException("Queue and batch sizes must be greater than 1");
}
this.batchMaxSize = batchMaxSize;
this.blockOnLastEvent = blockOnLastEvent;

this.eventQueue = new ArrayBlockingQueue<>(queueSize);
Thread updater = new Thread(this::feedConsumer);
updater.setDaemon(true);
updater.start();
}

public TransferListener getDelegate() {
return delegate;
}

private void feedConsumer() {
final ArrayList<Exchange> batch = new ArrayList<>(batchMaxSize);
try {
while (true) {
batch.clear();
if (eventQueue.drainTo(batch, BATCH_MAX_SIZE) == 0) {
batch.add(eventQueue.take());
}
demux(batch);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private void demux(List<Exchange> exchanges) {
for (Exchange exchange : exchanges) {
exchange.process(transferEvent -> {
TransferEvent.EventType type = transferEvent.getType();
try {
switch (type) {
case INITIATED:
delegate.transferInitiated(transferEvent);
break;
case STARTED:
delegate.transferStarted(transferEvent);
break;
case PROGRESSED:
delegate.transferProgressed(transferEvent);
break;
case CORRUPTED:
delegate.transferCorrupted(transferEvent);
break;
case SUCCEEDED:
delegate.transferSucceeded(transferEvent);
break;
case FAILED:
delegate.transferFailed(transferEvent);
break;
default:
LOGGER.warn("Invalid TransferEvent.EventType={}; ignoring it", type);
}
} catch (TransferCancelledException e) {
ongoing.put(transferEvent.getResource().getFile(), Boolean.FALSE);
}
});
}
}

private void put(TransferEvent event, boolean last) {
try {
Exchange exchange;
if (blockOnLastEvent && last) {
exchange = new BlockingExchange(event);
} else {
exchange = new Exchange(event);
}
eventQueue.put(exchange);
exchange.waitForProcessed();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

private final ConcurrentHashMap<File, Boolean> ongoing = new ConcurrentHashMap<>();

@Override
public void transferInitiated(TransferEvent event) {
ongoing.putIfAbsent(event.getResource().getFile(), Boolean.TRUE);
put(event, false);
}

@Override
public void transferStarted(TransferEvent event) throws TransferCancelledException {
if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
throw new TransferCancelledException();
}
put(event, false);
}

@Override
public void transferProgressed(TransferEvent event) throws TransferCancelledException {
if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
throw new TransferCancelledException();
}
put(event, false);
}

@Override
public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
if (ongoing.get(event.getResource().getFile()) == Boolean.FALSE) {
throw new TransferCancelledException();
}
put(event, false);
}

@Override
public void transferSucceeded(TransferEvent event) {
ongoing.remove(event.getResource().getFile());
put(event, ongoing.isEmpty());
}

@Override
public void transferFailed(TransferEvent event) {
ongoing.remove(event.getResource().getFile());
put(event, ongoing.isEmpty());
}

private static class Exchange {
private final TransferEvent event;

private Exchange(TransferEvent event) {
this.event = event;
}

public void process(Consumer<TransferEvent> consumer) {
consumer.accept(event);
}

public void waitForProcessed() throws InterruptedException {
// nothing, is async
}
}

private static class BlockingExchange extends Exchange {
private final CountDownLatch latch = new CountDownLatch(1);

private BlockingExchange(TransferEvent event) {
super(event);
}

@Override
public void process(Consumer<TransferEvent> consumer) {
super.process(consumer);
latch.countDown();
}

@Override
public void waitForProcessed() throws InterruptedException {
latch.await();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.cli.transfer;

import java.io.File;

import org.eclipse.aether.DefaultRepositorySystemSession;
import org.eclipse.aether.transfer.TransferCancelledException;
import org.eclipse.aether.transfer.TransferEvent;
import org.eclipse.aether.transfer.TransferListener;
import org.eclipse.aether.transfer.TransferResource;
import org.junit.Test;

import static org.junit.Assert.fail;

public class SimplexTransferListenerTest {
@Test
public void cancellation() throws InterruptedException {
TransferListener delegate = new TransferListener() {
@Override
public void transferInitiated(TransferEvent event) throws TransferCancelledException {
throw new TransferCancelledException();
}

@Override
public void transferStarted(TransferEvent event) throws TransferCancelledException {
throw new TransferCancelledException();
}

@Override
public void transferProgressed(TransferEvent event) throws TransferCancelledException {
throw new TransferCancelledException();
}

@Override
public void transferCorrupted(TransferEvent event) throws TransferCancelledException {
throw new TransferCancelledException();
}

@Override
public void transferSucceeded(TransferEvent event) {}

@Override
public void transferFailed(TransferEvent event) {}
};

SimplexTransferListener listener = new SimplexTransferListener(delegate);

TransferResource resource =
new TransferResource(null, null, "http://maven.org/test/test-resource", new File("file"), null);
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession();

// for technical reasons we cannot throw here, even if delegate does cancel transfer
listener.transferInitiated(new TransferEvent.Builder(session, resource)
.setType(TransferEvent.EventType.INITIATED)
.build());

Thread.sleep(500); // to make sure queue is processed, cancellation applied

// subsequent call will cancel
try {
listener.transferStarted(new TransferEvent.Builder(session, resource)
.resetType(TransferEvent.EventType.STARTED)
.build());
fail("should throw");
} catch (TransferCancelledException e) {
// good
}
}
}

0 comments on commit 18b8893

Please sign in to comment.