Skip to content

Commit

Permalink
Merge pull request #1433 from SpineEventEngine/delivery-thread-id
Browse files Browse the repository at this point in the history
[1.x] Make `AbstractWorkRegistry` respect the current worker of a node
  • Loading branch information
yevhenii-nadtochii authored Dec 15, 2021
2 parents aacba13 + 14e49be commit d3181ca
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 129 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.7.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.9-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
176 changes: 88 additions & 88 deletions license-report.md

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ all modules and does not describe the project structure per-subproject.

<groupId>io.spine</groupId>
<artifactId>spine-core-java</artifactId>
<version>1.7.7-SNAPSHOT.7</version>
<version>1.7.7-SNAPSHOT.8</version>

<inceptionYear>2015</inceptionYear>

Expand Down Expand Up @@ -207,12 +207,12 @@ all modules and does not describe the project structure per-subproject.
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<version>0.8.5</version>
<version>0.8.6</version>
</dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.ant</artifactId>
<version>0.8.5</version>
<version>0.8.6</version>
</dependency>
</dependencies>
</project>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,50 +45,66 @@
* persistence mechanism.
*
* @implNote This class is NOT thread safe. Synchronize the atomic persistence operations
* as well as the methods implemented in this class make an implementation thread safe.
* as well as the methods implemented in this class to make an implementation thread safe.
*/
@SPI
public abstract class AbstractWorkRegistry implements ShardedWorkRegistry {

@Override
public Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId) {
public Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId node) {
checkNotNull(index);
checkNotNull(nodeId);

Optional<ShardSessionRecord> record = find(index);
if (record.isPresent()) {
ShardSessionRecord existingRecord = record.get();
if (hasPickedBy(existingRecord)) {
return Optional.empty();
} else {
ShardSessionRecord updatedRecord = updateNode(existingRecord, nodeId);
return Optional.of(asSession(updatedRecord));
}
} else {
ShardSessionRecord newRecord = createRecord(index, nodeId);
checkNotNull(node);

WorkerId worker = currentWorkerFor(node);
Optional<ShardProcessingSession> result = pickUp(index, worker);
return result;
}

private Optional<ShardProcessingSession> pickUp(ShardIndex index, WorkerId worker) {
Optional<ShardSessionRecord> optionalRecord = find(index);
if (!optionalRecord.isPresent()) {
ShardSessionRecord newRecord = createRecord(index, worker);
return Optional.of(asSession(newRecord));
}

ShardSessionRecord record = optionalRecord.get();
if (hasWorker(record)) {
return Optional.empty();
}

ShardSessionRecord updatedRecord = updateNode(record, worker);
return Optional.of(asSession(updatedRecord));
}

private static boolean hasPickedBy(ShardSessionRecord record) {
return !NodeId.getDefaultInstance().equals(record.getPickedBy());
/**
* Returns an identifier of the current worker that is now going to process the shard.
*
* <p>An example of such an identifier could be ID of the thread which performs processing.
*
* @param node
* the node to which the resulted worker belongs
*/
protected abstract WorkerId currentWorkerFor(NodeId node);

private static boolean hasWorker(ShardSessionRecord record) {
return !WorkerId.getDefaultInstance().equals(record.getWorker());
}

private ShardSessionRecord createRecord(ShardIndex index, NodeId nodeId) {
private ShardSessionRecord createRecord(ShardIndex index, WorkerId worker) {
ShardSessionRecord newRecord = ShardSessionRecord
.newBuilder()
.setIndex(index)
.setPickedBy(nodeId)
.setWorker(worker)
.setWhenLastPicked(currentTime())
.vBuild();
write(newRecord);
return newRecord;
}

private ShardSessionRecord updateNode(ShardSessionRecord record, NodeId nodeId) {
private ShardSessionRecord updateNode(ShardSessionRecord record, WorkerId worker) {
ShardSessionRecord updatedRecord = record
.toBuilder()
.setPickedBy(nodeId)
.setWorker(worker)
.setWhenLastPicked(currentTime())
.build();
write(updatedRecord);
Expand All @@ -100,7 +116,7 @@ public Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
checkNotNull(inactivityPeriod);
ImmutableSet.Builder<ShardIndex> resultBuilder = ImmutableSet.builder();
allRecords().forEachRemaining(record -> {
if (record.hasPickedBy()) {
if (record.hasWorker()) {
Timestamp whenPicked = record.getWhenLastPicked();
Duration elapsed = between(whenPicked, currentTime());

Expand All @@ -115,11 +131,11 @@ public Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
}

/**
* Clears the value of {@code ShardSessionRecord.when_last_picked} and stores the session.
* Clears the value of {@code ShardSessionRecord.worker} and stores the session.
*/
protected void clearNode(ShardSessionRecord session) {
ShardSessionRecord record = session.toBuilder()
.clearPickedBy()
.clearWorker()
.build();
write(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/**
* The session of processing the messages, which reside in a shard.
*
* <p>Starts by {@linkplain ShardedWorkRegistry#pickUp(ShardIndex, NodeId)} picking up}
* <p>Starts by {@linkplain ShardedWorkRegistry#pickUp(ShardIndex, NodeId) picking up}
* the shard to process.
*/
@SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ public interface ShardedWorkRegistry {
*
* @param index
* the index of the shard to pick up for processing
* @param nodeId
* @param node
* the identifier of the node for which to pick the shard
* @return the session of shard processing,
* or {@code Optional.empty()} if the shard is not available
*/
Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId);
Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId node);

/**
* Clears up the recorded {@code NodeId}s from the session records if there was no activity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.spine.server.delivery.ShardProcessingSession;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.ShardedWorkRegistry;
import io.spine.server.delivery.WorkerId;

import java.util.Iterator;
import java.util.Map;
Expand All @@ -56,6 +57,16 @@ public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, No
return super.pickUp(index, nodeId);
}

@Override
protected WorkerId currentWorkerFor(NodeId node) {
WorkerId worker = WorkerId
.newBuilder()
.setNodeId(node)
.setValue(String.valueOf(Thread.currentThread().getId()))
.vBuild();
return worker;
}

@Override
public synchronized Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
return super.releaseExpiredSessions(inactivityPeriod);
Expand Down
33 changes: 24 additions & 9 deletions server/src/main/proto/spine/server/delivery/delivery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import "spine/server/server_environment.proto";
//
// A value type used across the application. To be potentially used in JavaScript, Go, C++ and
// other client and server environments, that are split into shards.
//
message ShardIndex {

// The zero-based index of the shard.
Expand All @@ -60,17 +59,18 @@ message ShardSessionRecord {
// The index of a shard processed in this session.
ShardIndex index = 1 [(required) = true];

// The identifier of an application node, which picked up the index and processes it.
//
// Unset until a node picks the session.
//
NodeId picked_by = 2;

// When the shard processed within the session was last picked by the node.
//
// This field is unset if no nodes ever picked the session.
//
google.protobuf.Timestamp when_last_picked = 3;

// An identifier of an application worker, which picked up the shard and processes it.
//
// Unset until a worker picks the shard.
WorkerId worker = 4;

reserved 2;
reserved "picked_by";
}

//A stage of the `Delivery` process running for some particular `ShardIndex`.
Expand All @@ -87,8 +87,23 @@ message DeliveryStage {
//
// Represented in each of the bounded contexts as an event reactor,
// as it's impossible to have several process managers of the same state across bounded contexts.
//
message ShardMaintenance {

ShardIndex id = 1;
}

// An identifier of a worker which processes a shard.
//
// This value is unique across the application. It is used to indicate who is currently processing
// which shard. A single node can contain several workers (typically represented by threads)
// processing different shards.
message WorkerId {

// A node to which this worker belongs.
NodeId nodeId = 1 [(required) = true];

// Worker's identifier.
//
// An example of such an identifier could be ID of the thread which processes a shard.
string value = 2 [(required) = true];
}
2 changes: 1 addition & 1 deletion version.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
/**
* Version of this library.
*/
val coreJava = "1.7.7-SNAPSHOT.7"
val coreJava = "1.7.7-SNAPSHOT.8"

/**
* Versions of the Spine libraries that `core-java` depends on.
Expand Down

0 comments on commit d3181ca

Please sign in to comment.