Skip to content

Commit

Permalink
feat: GRPC Java implementation for client layer V2 (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
aneojgurhem authored Jun 19, 2024
2 parents 9fe5064 + a5fd6b1 commit 561d956
Show file tree
Hide file tree
Showing 7 changed files with 547 additions and 0 deletions.
20 changes: 20 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,23 @@ jobs:
if-no-files-found: error
path: packages/cpp/tools/packaging/*.${{ matrix.type }}
name: libarmonik.${{ matrix.type }}

build-java-packages:
name: Build Java
runs-on: ubuntu-latest
defaults:
run:
working-directory: packages/java
steps:
- name: Checkout
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4
with:
fetch-depth: 0
- name: Set up java 17
uses: actions/setup-java@v4
with:
distribution: oracle
java-version: 17
cache: maven
- name: Build the package
run: mvn clean install -DskipTests
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ dist
!packages/python
!packages/angular
!packages/web
!packages/java
packages/web/LICENSE

!.docs/content/**
Expand All @@ -390,3 +391,14 @@ packages/web/LICENSE
**/.idea

gen

# Maven
target/

# Compiled class file
*.class

# IDE-specific files
*.iml
*.ipr
*.iws
137 changes: 137 additions & 0 deletions packages/java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.aneo</groupId>
<artifactId>armonik-java</artifactId>
<description>GRPC java binding for the Armonik orchestrator API</description>
<version>0.1.0</version>

<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>
<proto.path>../../Protos/V1</proto.path>
<sl4j.version>2.0.12</sl4j.version>
<logback.version>1.5.5</logback.version>
</properties>

<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>1.62.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.6.5</version>
</dependency>


<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.62.2</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.62.2</version>
</dependency>
<dependency>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>

<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>


<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${sl4j.version}</version>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>



<!-- Mockito dependencies -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.11.0</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>5.11.0</version>
<scope>test</scope>
</dependency>


</dependencies>


<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os-maven-plugin.version}</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protoSourceRoot>${proto.path}</protoSourceRoot>
<protocArtifact>
com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>
io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
185 changes: 185 additions & 0 deletions packages/java/src/main/java/armonik/client/event/EventClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package armonik.client.event;

import static armonik.api.grpc.v1.events.EventsCommon.EventsEnum.EVENTS_ENUM_NEW_RESULT;
import static armonik.api.grpc.v1.events.EventsCommon.EventsEnum.EVENTS_ENUM_RESULT_STATUS_UPDATE;
import static armonik.api.grpc.v1.results.ResultsFields.ResultRawEnumField.RESULT_RAW_ENUM_FIELD_RESULT_ID;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import armonik.api.grpc.v1.FiltersCommon;
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionRequest;
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse;
import armonik.api.grpc.v1.events.EventsGrpc;
import armonik.api.grpc.v1.events.EventsGrpc.EventsBlockingStub;
import armonik.api.grpc.v1.results.ResultsFields;
import armonik.api.grpc.v1.results.ResultsFilters;
import armonik.client.event.util.records.EventSubscriptionResponseRecord;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;

/**
* EventClient is a client for interacting with event-related functionalities.
* It communicates with a gRPC server using a blocking stub to retrieve events.
*/
public class EventClient {
/** The blocking and nonblocking stub for communicating with the gRPC server. */
private final EventsBlockingStub eventsBlockingStub;
private final EventsGrpc.EventsStub eventsStub;

/**
* Constructs a new EventClient with the specified managed channel.
*
* @param managedChannel the managed channel used for communication with the
* server
*/
public EventClient(ManagedChannel managedChannel) {
eventsBlockingStub = EventsGrpc.newBlockingStub(managedChannel);
eventsStub = EventsGrpc.newStub(managedChannel);
}

/**
* Retrieves a list of event subscription response records for the given session
* ID and result IDs.
*
* @param sessionId the session ID for which events are requested
* @param resultIds the list of result IDs for which events are requested
* @return a list of EventSubscriptionResponseRecord objects representing the
* events
*/
public List<EventSubscriptionResponseRecord> getEvents(String sessionId, List<String> resultIds) {
EventSubscriptionRequest request = CreateEventSubscriptionRequest(sessionId, resultIds);
return mapToRecord(sessionId, request, resultIds);
}

/**
* Maps the received event subscription response to
* EventSubscriptionResponseRecord objects.
*
* @param sessionId the session ID for which events are being mapped
* @param request the event subscription request
* @return a list of EventSubscriptionResponseRecord objects representing the
* events
*/
private List<EventSubscriptionResponseRecord> mapToRecord(String sessionId, EventSubscriptionRequest request,
List<String> resultIds) {
List<EventSubscriptionResponseRecord> responseRecords = new ArrayList<>();
Iterator<EventSubscriptionResponse> events = eventsBlockingStub.getEvents(request);
Set<String> resultsExpected = new HashSet<>(resultIds);

while (events.hasNext()) {
var esr = events.next();
resultsExpected.remove(esr.getNewResult().getResultId());
responseRecords
.add(new EventSubscriptionResponseRecord(sessionId,
esr.getTaskStatusUpdate(),
esr.getResultStatusUpdate(),
esr.getResultOwnerUpdate(),
esr.getNewTask(),
esr.getNewResult()));
if (resultsExpected.isEmpty()) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("Thread was interrupted while sleeping");
}
break;
}
}
return responseRecords;
}

/**
* Retrieves a list of event subscription response records for the given session
* asynchrone
* ID and result IDs.
*
* @param sessionId the session ID for which events are requested
* @param resultIds the list of result IDs for which events are requested
* @return a list of EventSubscriptionResponseRecord objects representing the
* events
* @throws InterruptedException
*/
public List<EventSubscriptionResponseRecord> getEventResponseRecords(String sessionId, List<String> resultIds)
throws InterruptedException {

EventSubscriptionRequest request = CreateEventSubscriptionRequest(sessionId, resultIds);
List<EventSubscriptionResponseRecord> responseRecords = new ArrayList<>();
CountDownLatch finishLatch = new CountDownLatch(1);

StreamObserver<EventSubscriptionResponse> responseObserver = new StreamObserver<EventSubscriptionResponse>() {

@Override
public void onNext(EventSubscriptionResponse esr) {
responseRecords.add(new EventSubscriptionResponseRecord(
sessionId,
esr.getTaskStatusUpdate(),
esr.getResultStatusUpdate(),
esr.getResultOwnerUpdate(),
esr.getNewTask(),
esr.getNewResult()));
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
finishLatch.countDown();
}

@Override
public void onCompleted() {
System.out.println("Stream completed");
finishLatch.countDown();
}
};

eventsStub.getEvents(request, responseObserver);

// Wait for the response observer to finish
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
System.out.println("Request not completed within the timeout.");
}

return responseRecords;
}

/**
* Creates an event subscription request with the specified session ID and
* result IDs.
*
* @param sessionId the session ID for which event subscription is requested
* @param resultIds the list of result IDs to filter events
* @return an EventSubscriptionRequest object configured with the provided
* session ID and result IDs
*/
public static EventSubscriptionRequest CreateEventSubscriptionRequest(String sessionId, List<String> resultIds) {
FiltersCommon.FilterString filterString = FiltersCommon.FilterString.newBuilder()
.setOperator(FiltersCommon.FilterStringOperator.FILTER_STRING_OPERATOR_EQUAL)
.build();

ResultsFields.ResultField.Builder resultField = ResultsFields.ResultField.newBuilder()
.setResultRawField(ResultsFields.ResultRawField.newBuilder().setField(RESULT_RAW_ENUM_FIELD_RESULT_ID));

ResultsFilters.FilterField.Builder filterFieldBuilder = ResultsFilters.FilterField.newBuilder()
.setField(resultField)
.setFilterString(filterString);

ResultsFilters.Filters.Builder resultFiltersBuilder = ResultsFilters.Filters.newBuilder();
for (String resultId : resultIds) {
filterFieldBuilder.setFilterString(FiltersCommon.FilterString.newBuilder().setValue(resultId).build());
resultFiltersBuilder.addOr(ResultsFilters.FiltersAnd.newBuilder().addAnd(filterFieldBuilder).build());
}

return EventSubscriptionRequest.newBuilder()
.setResultsFilters(resultFiltersBuilder.build())
.addReturnedEvents(EVENTS_ENUM_RESULT_STATUS_UPDATE)
.addReturnedEvents(EVENTS_ENUM_NEW_RESULT)
.setSessionId(sessionId)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package armonik.client.event.util.records;

import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.NewResult;
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.NewTask;
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.ResultOwnerUpdate;
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.ResultStatusUpdate;
import armonik.api.grpc.v1.events.EventsCommon.EventSubscriptionResponse.TaskStatusUpdate;

/**
* EventSubscriptionResponseRecord represents a record containing subscription
* response details for an event.
* It encapsulates various attributes related to event subscription, such as
* session ID, task status update,
* result status update, result owner update, new task, and new result.
*/
public record EventSubscriptionResponseRecord(String sessionId,
TaskStatusUpdate taskStatusUpdate,
ResultStatusUpdate resultStatusUpdate,
ResultOwnerUpdate resultOwnerUpdate,
NewTask newTask,
NewResult newResult) {
}
Loading

0 comments on commit 561d956

Please sign in to comment.