Skip to content

Commit

Permalink
TIKA-3790 -- actually implement tika server client via pipes (not yet…
Browse files Browse the repository at this point in the history
… async)
  • Loading branch information
tballison committed Jun 13, 2022
1 parent 18ce798 commit 7877f9b
Show file tree
Hide file tree
Showing 12 changed files with 332 additions and 171 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Release 2.4.1 - ???

* Implement bulk upload in the OpenSearch emitter (TIKA-3791).

* Implement tika-server client via pipes mode (TIKA-3790).

* Custom embedded parsers and EmbeddedDocumentHandlers
can now add metadata to the container file's
metadata (TIKA-3789).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedExceptio
throw new InterruptedException("thread interrupt");
}
PipesResult result = readResults(t, start);
LOG.info("finished reading result ");
if (LOG.isDebugEnabled()) {
long elapsed = System.currentTimeMillis() - readStart;
LOG.debug("finished reading result in {} ms", elapsed);
}

if (LOG.isTraceEnabled()) {
LOG.trace("pipesClientId={}: timer -- read result: {} ms",
Expand Down
10 changes: 8 additions & 2 deletions tika-core/src/main/java/org/apache/tika/pipes/PipesConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import java.nio.file.Path;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.tika.exception.TikaConfigException;

public class PipesConfig extends PipesConfigBase {

private static final Logger LOG = LoggerFactory.getLogger(PipesClient.class);

private long maxWaitForClientMillis = 60000;

public static PipesConfig load(Path tikaConfig) throws IOException, TikaConfigException {
Expand All @@ -34,8 +39,9 @@ public static PipesConfig load(Path tikaConfig) throws IOException, TikaConfigEx
Set<String> settings = pipesConfig.configure("pipes", is);
}
if (pipesConfig.getTikaConfig() == null) {
throw new TikaConfigException("must specify at least a <tikaConfig> element in the " +
"<params> of <pipes>");
LOG.debug("A separate tikaConfig was not specified in the <pipes/> element in the " +
"config file; will use {} for pipes", tikaConfig);
pipesConfig.setTikaConfig(tikaConfig);
}
return pipesConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,4 @@ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOEx
return FileVisitResult.CONTINUE;
}
}


}
5 changes: 5 additions & 0 deletions tika-server/tika-server-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>tika-serialization</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>tika-httpclient-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tika.server.client;

import org.apache.tika.client.HttpClientFactory;
import org.apache.tika.exception.TikaConfigException;

/**
* Low-level class to handle the http layer.
*/
class TikaAsyncHttpClient extends TikaPipesHttpClient {

private TikaAsyncHttpClient(String baseUrl, HttpClientFactory httpClientFactory)
throws TikaConfigException {
super(baseUrl, httpClientFactory);
}

private final String endPoint = "async";

String getEndpoint() {
return endPoint;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,28 @@
import java.util.List;
import java.util.Random;

import org.apache.tika.config.TikaConfig;
import org.apache.tika.client.HttpClientFactory;
import org.apache.tika.exception.TikaConfigException;
import org.apache.tika.exception.TikaException;
import org.apache.tika.metadata.serialization.JsonFetchEmitTuple;
import org.apache.tika.metadata.serialization.JsonFetchEmitTupleList;
import org.apache.tika.pipes.FetchEmitTuple;

public class TikaClient {

private final Random random = new Random();
private final List<TikaHttpClient> clients;
private final List<TikaPipesHttpClient> clients;


private TikaClient(List<TikaHttpClient> clients) {

private TikaClient(List<TikaPipesHttpClient> clients) {
this.clients = clients;
}

public static TikaClient get(TikaConfig tikaConfig, List<String> tikaServers)
throws TikaClientConfigException {
public static TikaClient get(HttpClientFactory httpClientFactory, List<String> tikaServers)
throws TikaConfigException {
List clients = new ArrayList<>();
for (String url : tikaServers) {
clients.add(TikaHttpClient.get(url));
//client factory is not thread safe, create a copy per client
clients.add(new TikaPipesHttpClient(url, httpClientFactory.copy()));
}
return new TikaClient(clients);
}
Expand All @@ -54,21 +54,13 @@ public static TikaClient get(TikaConfig tikaConfig, List<String> tikaServers)
}*/

public TikaEmitterResult parse(FetchEmitTuple fetchEmit) throws IOException, TikaException {
TikaHttpClient client = getHttpClient();
TikaPipesHttpClient client = getHttpClient();
StringWriter writer = new StringWriter();
JsonFetchEmitTuple.toJson(fetchEmit, writer);
return client.postJson(writer.toString());
}

public TikaEmitterResult parseAsync(List<FetchEmitTuple> tuples)
throws IOException, TikaException {
StringWriter writer = new StringWriter();
JsonFetchEmitTupleList.toJson(tuples, writer);
TikaHttpClient client = getHttpClient();
return client.postJsonAsync(writer.toString());
}

private TikaHttpClient getHttpClient() {
private TikaPipesHttpClient getHttpClient() {
if (clients.size() == 1) {
return clients.get(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -37,7 +34,6 @@
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

import org.apache.tika.config.TikaConfig;
import org.apache.tika.exception.TikaException;
import org.apache.tika.pipes.FetchEmitTuple;
import org.apache.tika.pipes.pipesiterator.PipesIterator;
Expand All @@ -47,49 +43,50 @@ public class TikaClientCLI {
private static final Logger LOGGER = LoggerFactory.getLogger(TikaClientCLI.class);
private static final int QUEUE_SIZE = 10000;

private final long maxWaitMs = 300000;

public static void main(String[] args) throws Exception {
//TODO -- add an actual commandline,
Path tikaConfigPath = Paths.get(args[0]);
int numThreads = Integer.parseInt(args[1]);
List<String> tikaServerUrls = Arrays.asList(args[2].split(","));
TikaClientCLI cli = new TikaClientCLI();
cli.execute(tikaConfigPath, tikaServerUrls, numThreads);
cli.execute(tikaConfigPath);
}

private void execute(Path tikaConfigPath, List<String> tikaServerUrls, int numThreads)
private void execute(Path tikaConfigPath)
throws TikaException, IOException, SAXException {
TikaConfig config = new TikaConfig(tikaConfigPath);

ExecutorService executorService = Executors.newFixedThreadPool(numThreads + 1);
ExecutorCompletionService<Integer> completionService =
new ExecutorCompletionService<>(executorService);
final PipesIterator pipesIterator =
PipesIterator.build(tikaConfigPath);
final ArrayBlockingQueue<FetchEmitTuple> queue =
new ArrayBlockingQueue<>(QUEUE_SIZE);

completionService.submit(new PipesIteratorWrapper(pipesIterator, queue, numThreads));
if (tikaServerUrls.size() == numThreads) {
logDiffSizes(tikaServerUrls.size(), numThreads);
for (int i = 0; i < numThreads; i++) {
TikaServerClientConfig clientConfig = TikaServerClientConfig.build(tikaConfigPath);

ExecutorService executorService =
Executors.newFixedThreadPool(clientConfig.getNumThreads() + 1);

ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

final PipesIterator pipesIterator = PipesIterator.build(tikaConfigPath);

final ArrayBlockingQueue<FetchEmitTuple> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

completionService.submit(new PipesIteratorWrapper(pipesIterator, queue));

if (clientConfig.getTikaEndpoints().size() == clientConfig.getNumThreads()) {
logDiffSizes(clientConfig.getTikaEndpoints().size(), clientConfig.getNumThreads());
for (int i = 0; i < clientConfig.getNumThreads(); i++) {
TikaClient client =
TikaClient.get(config, Collections.singletonList(tikaServerUrls.get(i)));
completionService.submit(new FetchWorker(queue, client));
TikaClient.get(clientConfig.getHttpClientFactory(),
Collections.singletonList(clientConfig.getTikaEndpoints().get(i)));
completionService.submit(new FetchWorker(queue, client,
clientConfig.getMaxWaitMillis()));
}
} else {
for (int i = 0; i < numThreads; i++) {
TikaClient client = TikaClient.get(config, tikaServerUrls);
completionService.submit(new FetchWorker(queue, client));
for (int i = 0; i < clientConfig.getNumThreads(); i++) {
TikaClient client = TikaClient.get(clientConfig.getHttpClientFactory(),
clientConfig.getTikaEndpoints());
completionService.submit(new FetchWorker(queue, client,
clientConfig.getMaxWaitMillis()));
}
}

int finished = 0;
while (finished < numThreads + 1) {
while (finished < clientConfig.getNumThreads() + 1) {
Future<Integer> future = null;
try {
future = completionService.poll(maxWaitMs, TimeUnit.MILLISECONDS);
future = completionService.poll(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
//stop the world
LOGGER.error("", e);
Expand All @@ -101,7 +98,7 @@ private void execute(Path tikaConfigPath, List<String> tikaServerUrls, int numTh
future.get();
} catch (InterruptedException | ExecutionException e) {
//stop the world
LOGGER.error("", e);
LOGGER.error("critical main loop failure", e);
throw new RuntimeException(e);
}
}
Expand All @@ -113,62 +110,30 @@ private void logDiffSizes(int servers, int numThreads) {
"Each client will randomly select a server from this list", servers, numThreads);
}

private class AsyncFetchWorker implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;

public AsyncFetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
this.queue = queue;
this.client = client;
}

@Override
public Integer call() throws Exception {
List<FetchEmitTuple> localCache = new ArrayList<>();
while (true) {

FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
if (t == null) {
send(localCache);
throw new TimeoutException("exceeded maxWaitMs");
}
if (t == PipesIterator.COMPLETED_SEMAPHORE) {
send(localCache);
return 1;
}
if (localCache.size() > 20) {
LOGGER.debug("about to send: {}", localCache.size());
send(localCache);
localCache.clear();
}
localCache.add(t);
}
}

private void send(List<FetchEmitTuple> localCache) {

}
}

private class FetchWorker implements Callable<Integer> {
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final TikaClient client;

public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue, TikaClient client) {
private final long maxWaitMs;

public FetchWorker(ArrayBlockingQueue<FetchEmitTuple> queue,
TikaClient client, long maxWaitMs) {
this.queue = queue;
this.client = client;
this.maxWaitMs = maxWaitMs;
}

@Override
public Integer call() throws Exception {

while (true) {

FetchEmitTuple t = queue.poll(maxWaitMs, TimeUnit.MILLISECONDS);
if (t == null) {
throw new TimeoutException("exceeded maxWaitMs");
}
if (t == PipesIterator.COMPLETED_SEMAPHORE) {
//potentially blocks forever
queue.put(PipesIterator.COMPLETED_SEMAPHORE);
return 1;
}
try {
Expand All @@ -184,15 +149,10 @@ public Integer call() throws Exception {
private static class PipesIteratorWrapper implements Callable<Integer> {
private final PipesIterator pipesIterator;
private final ArrayBlockingQueue<FetchEmitTuple> queue;
private final int numThreads;

public PipesIteratorWrapper(PipesIterator pipesIterator,
ArrayBlockingQueue<FetchEmitTuple> queue,
int numThreads) {
ArrayBlockingQueue<FetchEmitTuple> queue) {
this.pipesIterator = pipesIterator;
this.queue = queue;
this.numThreads = numThreads;

}

@Override
Expand All @@ -201,9 +161,8 @@ public Integer call() throws Exception {
//potentially blocks forever
queue.put(t);
}
for (int i = 0; i < numThreads; i ++) {
queue.put(PipesIterator.COMPLETED_SEMAPHORE);
}
//potentially blocks forever
queue.put(PipesIterator.COMPLETED_SEMAPHORE);
return 1;
}
}
Expand Down
Loading

0 comments on commit 7877f9b

Please sign in to comment.