Skip to content

Commit

Permalink
Merge pull request #58 from abrami/paper
Browse files Browse the repository at this point in the history
Mayor Updates Kubernetes
  • Loading branch information
abrami authored Nov 19, 2023
2 parents 41310e3 + 4a79fac commit 682146d
Show file tree
Hide file tree
Showing 101 changed files with 5,937 additions and 4,278 deletions.
34 changes: 30 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@
</repository>
</repositories>


<build>
<resources>
<resource>
<filtering>false</filtering>
<directory>src/main/resources</directory>
<excludes>
<exclude>desc/type/**/*</exclude>
<exclude>org/**/*</exclude>
</excludes>
</resource>
<resource>
<filtering>true</filtering>
<directory>src/main/resources</directory>
<includes>
<include>desc/type/**/*</include>
<include>org/**/*</include>
</includes>
</resource>
</resources>
<plugins>

<plugin>
Expand Down Expand Up @@ -114,8 +133,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>16</source>
<target>16</target>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<plugin>
Expand All @@ -142,6 +161,12 @@
</build>
<dependencies>

<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
<version>3.12.11</version>
</dependency>

<dependency>
<groupId>io.kubernetes</groupId>
<artifactId>client-java</artifactId>
Expand Down Expand Up @@ -254,7 +279,8 @@
<dependency>
<groupId>com.github.texttechnologylab.textimager-uima</groupId>
<artifactId>textimager-uima-types</artifactId>
<version>b10db8275163d19d63b9512cc20ca5872e52499b</version>
<version>8efebf1980</version>
<!-- <version>b10db8275163d19d63b9512cc20ca5872e52499b</version>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.openjdk.nashorn/nashorn-core -->
<dependency>
Expand All @@ -268,7 +294,7 @@
<dependency>
<groupId>com.github.texttechnologylab</groupId>
<artifactId>UIMATypeSystem</artifactId>
<version>021f3cfff6</version>
<version>74a8489af5</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack-core -->
Expand Down
Binary file modified serialization_gercorpa.db
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.texttechnologylab.DockerUnifiedUIMAInterface;

import de.tudarmstadt.ukp.dkpro.core.api.metadata.type.DocumentMetaData;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
Expand All @@ -19,11 +20,13 @@
import org.texttechnologylab.DockerUnifiedUIMAInterface.connection.IDUUIConnectionHandler;
import org.texttechnologylab.DockerUnifiedUIMAInterface.driver.*;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.AsyncCollectionReader;
import org.texttechnologylab.DockerUnifiedUIMAInterface.io.DUUIAsynchronousProcessor;
import org.texttechnologylab.DockerUnifiedUIMAInterface.lua.DUUILuaContext;
import org.texttechnologylab.DockerUnifiedUIMAInterface.monitoring.DUUIMonitor;
import org.texttechnologylab.DockerUnifiedUIMAInterface.pipeline_storage.DUUIPipelineDocumentPerformance;
import org.texttechnologylab.DockerUnifiedUIMAInterface.pipeline_storage.IDUUIStorageBackend;
import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategy;
import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategyByDelemiter;
import org.texttechnologylab.DockerUnifiedUIMAInterface.segmentation.DUUISegmentationStrategyNone;
import org.xml.sax.SAXException;

Expand Down Expand Up @@ -264,12 +267,26 @@ public void run() {
segmentationStrategy.initialize(_jc);

JCas jCasSegmented = segmentationStrategy.getNextSegment();

while (jCasSegmented != null) {
// Process each cas sequentially
// TODO add parallel variant later

if(segmentationStrategy instanceof DUUISegmentationStrategyByDelemiter){
DUUISegmentationStrategyByDelemiter pStrategie = ((DUUISegmentationStrategyByDelemiter) segmentationStrategy);

if (pStrategie.hasDebug()) {
int iLeft = pStrategie.getSegments();
DocumentMetaData dmd = DocumentMetaData.get(_jc);
System.out.println(dmd.getDocumentId() + " Left: " + iLeft);
}


}
i.getDriver().run(i.getUUID(), jCasSegmented, perf);

segmentationStrategy.merge(jCasSegmented);

jCasSegmented = segmentationStrategy.getNextSegment();
}

Expand All @@ -292,6 +309,119 @@ public void run() {
}
}

class DUUIWorkerAsyncProcessor extends Thread {
Vector<DUUIComposer.PipelinePart> _flow;
AtomicInteger _threadsAlive;
AtomicBoolean _shutdown;
IDUUIStorageBackend _backend;
JCas _jc;
String _runKey;
DUUIAsynchronousProcessor _processor;

DUUIWorkerAsyncProcessor(Vector<DUUIComposer.PipelinePart> engineFlow, JCas jc, AtomicBoolean shutdown, AtomicInteger error,
IDUUIStorageBackend backend, String runKey, DUUIAsynchronousProcessor processor) {
super();
_flow = engineFlow;
_jc = jc;
_shutdown = shutdown;
_threadsAlive = error;
_backend = backend;
_runKey = runKey;
_processor = processor;
}

@Override
public void run() {
int num = _threadsAlive.addAndGet(1);
while (true) {
long waitTimeStart = System.nanoTime();
long waitTimeEnd = 0;
while (true) {
if (_shutdown.get()) {
_threadsAlive.getAndDecrement();
return;
}
try {
if (!_processor.getNextCAS(_jc)) {
//Give the main IO Thread time to finish work
Thread.sleep(300);
} else {
break;
}

} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (waitTimeEnd == 0) waitTimeEnd = System.nanoTime();

//System.out.printf("[Composer] Thread %d still alive and doing work\n",num);

boolean trackErrorDocs = false;
if (_backend != null) {
trackErrorDocs = _backend.shouldTrackErrorDocs();
}

DUUIPipelineDocumentPerformance perf = new DUUIPipelineDocumentPerformance(_runKey,
waitTimeEnd - waitTimeStart,
_jc,
trackErrorDocs);
for (DUUIComposer.PipelinePart i : _flow) {
try {
// Segment document for each item in the pipeline separately
// TODO support "complete pipeline" segmentation to only segment once
// TODO thread safety needed for here?
DUUISegmentationStrategy segmentationStrategy = i.getSegmentationStrategy();
if (segmentationStrategy instanceof DUUISegmentationStrategyNone) {
i.getDriver().run(i.getUUID(), _jc, perf);
} else {
segmentationStrategy.initialize(_jc);

JCas jCasSegmented = segmentationStrategy.getNextSegment();

while (jCasSegmented != null) {
// Process each cas sequentially
// TODO add parallel variant later

if (segmentationStrategy instanceof DUUISegmentationStrategyByDelemiter) {
DUUISegmentationStrategyByDelemiter pStrategie = ((DUUISegmentationStrategyByDelemiter) segmentationStrategy);

if (pStrategie.hasDebug()) {
int iLeft = pStrategie.getSegments();
DocumentMetaData dmd = DocumentMetaData.get(_jc);
System.out.println(dmd.getDocumentId() + " Left: " + iLeft);
}


}
i.getDriver().run(i.getUUID(), jCasSegmented, perf);

segmentationStrategy.merge(jCasSegmented);

jCasSegmented = segmentationStrategy.getNextSegment();
}

segmentationStrategy.finalize(_jc);
}

} catch (Exception e) {
//Ignore errors at the moment
//e.printStackTrace();
if(!(e instanceof IOException)) {
System.err.println(e.getMessage());
System.out.println("Thread continues work with next document!");
break;
}
}
}

if (_backend != null) {
_backend.addMetricsForDocument(perf);
}
}
}
}



public class DUUIComposer {
Expand Down Expand Up @@ -483,7 +613,7 @@ public DUUISegmentationStrategy getSegmentationStrategy() {
}

// Always return a copy to allow for multiple processes/threads
System.out.println("Cloning segmentation strategy: " + segmentationStrategy.getClass().getName());
// System.out.println("Cloning segmentation strategy: " + segmentationStrategy.getClass().getName());
return SerializationUtils.clone(segmentationStrategy);
}
}
Expand All @@ -493,6 +623,75 @@ public DUUIComposer resetPipeline() {
return this;
}

public void run(DUUIAsynchronousProcessor collectionReader, String name) throws Exception {
ConcurrentLinkedQueue<JCas> emptyCasDocuments = new ConcurrentLinkedQueue<>();
AtomicInteger aliveThreads = new AtomicInteger(0);
_shutdownAtomic.set(false);

Exception catched = null;

System.out.printf("[Composer] Running in asynchronous mode, %d threads at most!\n", _workers);

try {
if (_storage != null) {
_storage.addNewRun(name, this);
}
TypeSystemDescription desc = instantiate_pipeline();
if (_cas_poolsize == null) {
_cas_poolsize = (int) Math.ceil(_workers * 1.5);
System.out.printf("[Composer] Calculated CAS poolsize of %d!\n", _cas_poolsize);
} else {
if (_cas_poolsize < _workers) {
System.err.println("[Composer] WARNING: Pool size is smaller than the available threads, this is likely a bottleneck.");
}
}

for (int i = 0; i < _cas_poolsize; i++) {
emptyCasDocuments.add(JCasFactory.createJCas(desc));
}

Thread[] arr = new Thread[_workers];
for (int i = 0; i < _workers; i++) {
System.out.printf("[Composer] Starting worker thread [%d/%d]\n", i + 1, _workers);
arr[i] = new DUUIWorkerAsyncProcessor(_instantiatedPipeline, emptyCasDocuments.poll(), _shutdownAtomic, aliveThreads, _storage, name, collectionReader);
arr[i].start();
}
Instant starttime = Instant.now();
while (!_shutdownAtomic.get()) {

}

AtomicInteger waitCount = new AtomicInteger();
waitCount.set(0);
// Wartet, bis die Dokumente fertig verarbeitet wurden.
while (emptyCasDocuments.size() != _cas_poolsize && !collectionReader.isFinish()) {
if (waitCount.incrementAndGet() % 500 == 0) {
System.out.println("[Composer] Waiting for threads to finish document processing...");
}
Thread.sleep(1000 * _workers); // to fast or in relation with threads?

}
System.out.println("[Composer] All documents have been processed. Signaling threads to shut down now...");
_shutdownAtomic.set(true);

for (int i = 0; i < arr.length; i++) {
System.out.printf("[Composer] Waiting for thread [%d/%d] to shut down\n", i + 1, arr.length);
arr[i].join();
System.out.printf("[Composer] Thread %d returned.\n", i);
}
if (_storage != null) {
_storage.finalizeRun(name, starttime, Instant.now());
}
System.out.println("[Composer] All threads returned.");
shutdown_pipeline();
} catch (Exception e) {
e.printStackTrace();
System.out.println("[Composer] Something went wrong, shutting down remaining components...");
shutdown_pipeline();
throw e;
}
}

public void run(AsyncCollectionReader collectionReader, String name) throws Exception {
ConcurrentLinkedQueue<JCas> emptyCasDocuments = new ConcurrentLinkedQueue<>();
ConcurrentLinkedQueue<JCas> loadedCasDocuments = new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -548,10 +747,15 @@ public void run(AsyncCollectionReader collectionReader, String name) throws Exce
if(breakit) break;
}

AtomicInteger waitCount = new AtomicInteger();
waitCount.set(0);
// Wartet, bis die Dokumente fertig verarbeitet wurden.
while(emptyCasDocuments.size() != _cas_poolsize && !collectionReader.isEmpty()) {
System.out.println("[Composer] Waiting for threads to finish document processing...");
if (waitCount.incrementAndGet() % 500 == 0) {
System.out.println("[Composer] Waiting for threads to finish document processing...");
}
Thread.sleep(1000*_workers); // to fast or in relation with threads?

}
System.out.println("[Composer] All documents have been processed. Signaling threads to shut down now...");
_shutdownAtomic.set(true);
Expand Down Expand Up @@ -620,9 +824,12 @@ private void run_async(CollectionReader collectionReader, String name) throws Ex
collectionReader.getNext(jc.getCas());
loadedCasDocuments.add(jc);
}

AtomicInteger waitCount = new AtomicInteger();
waitCount.set(0);
while(emptyCasDocuments.size() != _cas_poolsize) {
System.out.println("[Composer] Waiting for threads to finish document processing...");
if (waitCount.getAndIncrement() % 500 == 0) {
System.out.println("[Composer] Waiting for threads to finish document processing...");
}
Thread.sleep(1000);
}
System.out.println("[Composer] All documents have been processed. Signaling threads to shut down now...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@
import com.github.dockerjava.api.model.*;
import com.github.dockerjava.core.DockerClientBuilder;
import com.github.dockerjava.core.command.LogContainerResultCallback;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.google.common.collect.ImmutableList;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.InvalidParameterException;
Expand Down Expand Up @@ -154,16 +152,19 @@ public class DUUIDockerInterface {
* @throws IOException
*/
public DUUIDockerInterface() throws IOException {
URI dockerClientURI;
if (System.getProperty("os.name").toLowerCase().contains("windows")) {
dockerClientURI = URI.create("npipe:////./pipe/docker_engine");
} else {
dockerClientURI = URI.create("tcp://localhost:2375");
}

_docker = DockerClientBuilder.getInstance()
.withDockerHttpClient(new ApacheDockerHttpClient.Builder()
.dockerHost(dockerClientURI).build()).build();
// GREAT TODO!!!
// URI dockerClientURI;
// if (System.getProperty("os.name").toLowerCase().contains("windows")) {
// dockerClientURI = URI.create("npipe:////./pipe/docker_engine");
// } else {
// dockerClientURI = URI.create("tcp://localhost:2375");
// }
//
// _docker = DockerClientBuilder.getInstance()
// .withDockerHttpClient(new ApacheDockerHttpClient.Builder()
// .dockerHost(dockerClientURI).build()).build();
_docker = DockerClientBuilder.getInstance().build();


// DockerClientConfig config = DefaultDockerClientConfig.createDefaultConfigBuilder().withDockerHost("tcp://localhost:2375").build();
Expand Down
Loading

0 comments on commit 682146d

Please sign in to comment.