Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async consumers for multiprocessing #2

Merged
merged 4 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,25 @@ error.maxRedeliveries=4
This defines how many times to retry a message before failing completely.

There are also common ActiveMQ properties to setup the connection.

```
# ActiveMQ options
jms.brokerUrl=tcp://localhost:61616
```

This defines the url to the ActiveMQ broker.

```
jms.username=
jms.password=
```
This defines the login credentials (if required)

```
jms.connections=10
```
This defines the pool of connections to the ActiveMQ instance.

```
jms.concurrent-consumers=1
```
Expand All @@ -66,19 +71,24 @@ It's properties are:
# Fcrepo indexer options
fcrepo.indexer.enabled=true
```

This defines whether the Fedora indexer is enabled or not.

```
fcrepo.indexer.node=queue:islandora-indexing-fcrepo-content
fcrepo.indexer.delete=queue:islandora-indexing-fcrepo-delete
fcrepo.indexer.media=queue:islandora-indexing-fcrepo-media
fcrepo.indexer.external=queue:islandora-indexing-fcrepo-file-external
```

These define the various queues to listen on for the indexing/deletion
messages. The part after `queue:` should match your Islandora instance "Actions".

```
fcrepo.indexer.milliner.baseUrl=http://localhost:8000/milliner
```
This defines the location of your Milliner microservice.

```
fcrepo.indexer.concurrent-consumers=1
fcrepo.indexer.max-concurrent-consumers=1
Expand All @@ -87,6 +97,12 @@ These define the default number of concurrent consumers and maximum number of co
consumers working off your ActiveMQ instance.
A value of `-1` means no setting is applied.

```
fcrepo.indexer.async-consumer=true
```

This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing.

### islandora-indexing-triplestore

This service indexes the Drupal node into the configured triplestore
Expand All @@ -97,25 +113,39 @@ It's properties are:
# Triplestore indexer options
triplestore.indexer.enabled=false
```

This defines whether the Triplestore indexer is enabled or not.

```
triplestore.index.stream=queue:islandora-indexing-triplestore-index
triplestore.delete.stream=queue:islandora-indexing-triplestore-delete
```

These define the various queues to listen on for the indexing/deletion
messages. The part after `queue:` should match your Islandora instance "Actions".

```
triplestore.baseUrl=http://localhost:8080/bigdata/namespace/kb/sparql
```

This defines the location of your triplestore's SPARQL update endpoint.

```
triplestore.indexer.concurrent-consumers=1
triplestore.indexer.max-concurrent-consumers=1
```

These define the default number of concurrent consumers and maximum number of concurrent
consumers working off your ActiveMQ instance.
A value of `-1` means no setting is applied.


```
triplestore.indexer.async-consumer=true
```

This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing.

### islandora-connector-derivative

This service is used to configure an external microservice. This service will deploy multiple copies of its routes
Expand All @@ -127,25 +157,40 @@ a comma separated list. Each item in the list defines a new route and must also
```
derivative.<item>.enabled=true
```

This defines if the `item` service is enabled.

```
derivative.<item>.in.stream=queue:islandora-item-connector.index
```

This is the input queue for the derivative microservice.
The part after `queue:` should match your Islandora instance "Actions".

```
derivative.<item>.service.url=http://example.org/derivative/convert
```

This is the microservice URL to process the request.

```
derivative.<item>.concurrent-consumers=1
derivative.<item>.max-concurrent-consumers=1
```

These define the default number of concurrent consumers and maximum number of concurrent
consumers working off your ActiveMQ instance.
A value of `-1` means no setting is applied.


```
derivative.<item>.async-consumer=true
```

This property allows the concurrent consumers to process concurrently; otherwise, the consumers will wait to the previous message has been processed before executing.

For example, with two services defined (houdini and crayfits) my configuration would have

```
derivative.systems.installed=houdini,fits

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a total nit-pick but could you add

derivative.houdini.async-consumer=true

below as that one has different values for concurrent-consumers and max-concurrent-consumers.

Otherwise, I'm good with this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh... could do: Didn't really want to touch the example stuff where it says "[your] configuration would have [these things]"...

Expand All @@ -154,23 +199,27 @@ derivative.houdini.in.stream=queue:islandora-connector-houdini
derivative.houdini.service.url=http://127.0.0.1:8000/houdini/convert
derivative.houdini.concurrent-consumers=1
derivative.houdini.max-concurrent-consumers=4
derivative.houdini.async-consumer=true

derivative.fits.enabled=true
derivative.fits.in.stream=queue:islandora-connector-fits
derivative.fits.service.url=http://127.0.0.1:8000/crayfits
derivative.fits.concurrent-consumers=2
derivative.fits.max-concurrent-consumers=2
derivative.fits.async-consumer=false
```

### Customizing HTTP client timeouts

You can alter the HTTP client from the defaults for its request, connection and socket timeouts.
To do this you want to enable the request configurer.

```shell
request.configurer.enabled=true
```

Then set the next 3 timeouts (measured in milliseconds) to the desired timeout.

```shell
request.timeout=-1
connection.timeout=-1
Expand All @@ -182,6 +231,7 @@ The default for all three is `-1` which indicates no timeout.
## Deploying/Running

You can see the options by passing the `-h|--help` flag

```shell
> java -jar islandora-alpaca-app/build/libs/islandora-alpaca-app-2.0.0-all.jar -h
Usage: alpaca [-hV] [-c=<configurationFilePath>]
Expand All @@ -192,6 +242,7 @@ Usage: alpaca [-hV] [-c=<configurationFilePath>]
```

Using the `-V|--version` flag will just return the current version of the application.

```shell
> java -jar islandora-alpaca-app/build/libs/islandora-alpaca-app-2.0.0-all.jar -v
2.0.0
Expand All @@ -212,6 +263,7 @@ Logging is done to the console, and defaults to the INFO level. To get more verb
can use the Java property `islandora.alpaca.log`

i.e.

```shell
java -Dislandora.alpaca.log=DEBUG -jar islandora-alpaca-app-2.0.0-all.jar -c /opt/my.properties
```
Expand Down
6 changes: 6 additions & 0 deletions example.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ fcrepo.indexer.external=queue:islandora-indexing-fcrepo-file-external
fcrepo.indexer.milliner.baseUrl=http://127.0.0.1:8000/milliner/
fcrepo.indexer.concurrent-consumers=-1
fcrepo.indexer.max-concurrent-consumers=-1
fcrepo.indexer.async-consumer=false

# Triplestore indexer options
triplestore.indexer.enabled=true
Expand All @@ -29,6 +30,7 @@ triplestore.index.stream=queue:islandora-indexing-triplestore-index
triplestore.delete.stream=queue:islandora-indexing-triplestore-delete
triplestore.indexer.concurrent-consumers=-1
triplestore.indexer.max-concurrent-consumers=-1
triplestore.indexer.async-consumer=false

# Derivative services
derivative.systems.installed=fits,homarus,houdini,ocr
Expand All @@ -38,21 +40,25 @@ derivative.fits.in.stream=queue:islandora-connector-fits
derivative.fits.service.url=http://localhost:8000/crayfits
derivative.fits.concurrent-consumers=-1
derivative.fits.max-concurrent-consumers=-1
derivative.fits.async-consumer=false

derivative.homarus.enabled=true
derivative.homarus.in.stream=queue:islandora-connector-homarus
derivative.homarus.service.url=http://127.0.0.1:8000/homarus/convert
derivative.homarus.concurrent-consumers=-1
derivative.homarus.max-concurrent-consumers=-1
derivative.homarus.async-consumer=false

derivative.houdini.enabled=true
derivative.houdini.in.stream=queue:islandora-connector-houdini
derivative.houdini.service.url=http://127.0.0.1:8000/houdini/convert
derivative.houdini.concurrent-consumers=-1
derivative.houdini.max-concurrent-consumers=-1
derivative.houdini.async-consumer=false

derivative.ocr.enabled=true
derivative.ocr.in.stream=queue:islandora-connector-ocr
derivative.ocr.service.url=http://localhost:8000/hypercube
derivative.ocr.concurrent-consumers=-1
derivative.ocr.max-concurrent-consumers=-1
derivative.ocr.async-consumer=false
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,21 @@ public Integer call() throws Exception {
System.setProperty(ALPACA_CONFIG_PROPERTY, configurationFilePath.toFile().getAbsolutePath());
}
final var appContext = new AnnotationConfigApplicationContext("ca.islandora.alpaca");
appContext.start();
LOGGER.info("Alpaca started.");
try {
appContext.start();
LOGGER.info("Alpaca started.");

while (appContext.isRunning()) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException("This should never happen");
while (appContext.isRunning()) {
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
throw new RuntimeException("This should never happen");
}
}
return 0;
} finally {
appContext.close();
}
return 0;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class DerivativeOptions extends PropertyConfig {
private static final String DERIVATIVE_OUTPUT_PROPERTY = "service.url";
private static final String DERIVATIVE_CONCURRENT_PROPERTY = "concurrent-consumers";
private static final String DERIVATIVE_MAX_CONCURRENT_PROPERTY = "max-concurrent-consumers";
private static final String DERIVATIVE_ASYNC_CONSUMER = "async-consumer";

@Autowired
private Environment environment;
Expand Down Expand Up @@ -104,8 +105,11 @@ private void startDerivativeService(final String serviceName) throws Exception {
Integer.class, -1);
final int maxConcurrentConsumers = environment.getProperty(maxConcurrentConsumerProperty(serviceName),
Integer.class, -1);
final boolean asyncConsumer = environment.getProperty(asyncConsumerProperty(serviceName),
Boolean.class, false);
// Add concurrent/max-concurrent
final String finalInput = addJmsOptions(addBrokerName(input), concurrentConsumers, maxConcurrentConsumers);
final String finalInput = addJmsOptions(addBrokerName(input), concurrentConsumers, maxConcurrentConsumers,
asyncConsumer);
// Add connectionClose and other http options.
final String finalOutput = addHttpOptions(output);
camelContext.addRoutes(new DerivativeConnector(serviceName, finalInput, finalOutput, this));
Expand Down Expand Up @@ -180,4 +184,13 @@ private String maxConcurrentConsumerProperty(final String systemName) {
return DERIVATIVE_PREFIX + "." + systemName + "." + DERIVATIVE_MAX_CONCURRENT_PROPERTY;
}

/**
* Return the expected async-consumer property.
* @param systemName the derivative system name
* @return the property
*/
private String asyncConsumerProperty(final String systemName) {
return DERIVATIVE_PREFIX + "." + systemName + "." + DERIVATIVE_ASYNC_CONSUMER;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class FcrepoIndexerOptions extends PropertyConfig {
private static final String FCREPO_BASE_URI_HEADER_PROPERTY = "fcrepo.indexer.fedoraHeader";
private static final String FCREPO_INDEXER_CONCURRENT = "fcrepo.indexer.concurrent-consumers";
private static final String FCREPO_INDEXER_MAX_CONCURRENT = "fcrepo.indexer.max-concurrent-consumers";
private static final String FCREPO_INDEXER_ASYNC_CONSUMER = "fcrepo.indexer.async-consumer";

@Value("${" + FCREPO_INDEXER_NODE_INDEX + ":}")
private String fcrepoNodeIndex;
Expand All @@ -68,6 +69,9 @@ public class FcrepoIndexerOptions extends PropertyConfig {
@Value("${" + FCREPO_INDEXER_MAX_CONCURRENT + ":-1}")
private int fcrepoMaxConcurrentConsumers;

@Value("${" + FCREPO_INDEXER_ASYNC_CONSUMER + ":false}")
private boolean fcrepoAsyncConsumers;

/**
* Defines that Fedora indexer is only enabled if the appropriate property is set to "true".
*/
Expand Down Expand Up @@ -113,7 +117,7 @@ public String getExternalIndex() {
* The altered topic/queue string.
*/
private String addConcurrent(final String queueString) {
return addJmsOptions(queueString, fcrepoConcurrentConsumers, fcrepoMaxConcurrentConsumers);
return addJmsOptions(queueString, fcrepoConcurrentConsumers, fcrepoMaxConcurrentConsumers, fcrepoAsyncConsumers);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class TriplestoreIndexerOptions extends PropertyConfig {
private static final String TRIPLESTORE_DELETE_QUEUE = "triplestore.delete.stream";
private static final String TRIPLESTORE_CONCURRENT = "triplestore.indexer.concurrent-consumers";
private static final String TRIPLESTORE_MAX_CONCURRENT = "triplestore.indexer.max-concurrent-consumers";
private static final String TRIPLESTORE_ASYNC_CONSUMER = "triplestore.indexer.async-consumer";

@Value("${" + TRIPLESTORE_INDEX_QUEUE + ":}")
private String jmsIndexStream;
Expand All @@ -59,6 +60,9 @@ public class TriplestoreIndexerOptions extends PropertyConfig {
@Value("${" + TRIPLESTORE_MAX_CONCURRENT + ":-1}")
private int triplestoreMaxConcurrent;

@Value("${" + TRIPLESTORE_ASYNC_CONSUMER + ":false}")
private boolean triplestoreAsyncConsumer;

/**
* Defines that triplestore indexer is only enabled if the appropriate property is set to "true".
*/
Expand Down Expand Up @@ -99,7 +103,7 @@ public String getTriplestoreBaseUrl() {
* The altered topic/queue string.
*/
private String addConcurrent(final String queueString) {
return addJmsOptions(queueString, triplestoreConcurrent, triplestoreMaxConcurrent);
return addJmsOptions(queueString, triplestoreConcurrent, triplestoreMaxConcurrent, triplestoreAsyncConsumer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ public int getMaxRedeliveries() {
* The number of concurrent consumers. -1 means no setting.
* @param maxConcurrentConsumers
* The max number of concurrent consumers. -1 means no setting.
* @param asyncConsumers
* Indicate if the queue should be processed strictly queue-wise (false;
* more for dealing with overhead?); otherwise, allow multiple items to be
* processed at the same time.
* @return
* The modified topic/queue string.
*/
public static String addJmsOptions(final String queueString, final int concurrentConsumers,
final int maxConcurrentConsumers) {
final int maxConcurrentConsumers, final boolean asyncConsumers) {
final StringBuilder builder = new StringBuilder();
if (concurrentConsumers > 0) {
builder.append("concurrentConsumers=");
Expand All @@ -77,6 +81,13 @@ public static String addJmsOptions(final String queueString, final int concurren
builder.append("maxConcurrentConsumers=");
builder.append(maxConcurrentConsumers);
}
if (asyncConsumers) {
if (builder.length() > 0) {
builder.append('&');
}
builder.append("asyncConsumer=")
.append(asyncConsumers);
}
if (builder.length() > 0) {
return queueString + (queueString.contains("?") ? '&' : '?') + builder;
}
Expand Down