Skip to content

Commit

Permalink
Lock entities before calling an async operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Feb 14, 2022
1 parent b92945c commit 30b131a
Show file tree
Hide file tree
Showing 25 changed files with 779 additions and 414 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class ContractServiceExtension implements ServiceExtension {
private RemoteMessageDispatcherRegistry dispatcherRegistry;
@Inject
private CommandHandlerRegistry commandHandlerRegistry;
@Inject
private ContractNegotiationStore store;

@Override
public String name() {
Expand All @@ -85,10 +87,8 @@ public void initialize(ServiceExtensionContext context) {

@Override
public void start() {
// Start negotiation managers.
var negotiationStore = context.getService(ContractNegotiationStore.class);
consumerNegotiationManager.start(negotiationStore);
providerNegotiationManager.start(negotiationStore);
consumerNegotiationManager.start();
providerNegotiationManager.start();
}

@Override
Expand Down Expand Up @@ -139,6 +139,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandQueue(commandQueue)
.commandRunner(commandRunner)
.observable(observable)
.store(store)
.build();

providerNegotiationManager = ProviderContractNegotiationManagerImpl.Builder.newInstance()
Expand All @@ -149,6 +150,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandQueue(commandQueue)
.commandRunner(commandRunner)
.observable(observable)
.store(store)
.build();

context.registerService(ConsumerContractNegotiationManager.class, consumerNegotiationManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Predicate;

import static java.lang.String.format;
import static org.eclipse.dataspaceconnector.contract.common.ContractId.DEFINITION_PART;
Expand Down Expand Up @@ -78,13 +77,10 @@ public class ConsumerContractNegotiationManagerImpl implements ConsumerContractN
private CommandRunner<ContractNegotiationCommand> commandRunner;
private CommandProcessor<ContractNegotiationCommand> commandProcessor;
private Monitor monitor;
private Predicate<Boolean> isProcessed = it -> it;

public ConsumerContractNegotiationManagerImpl() {
}
private ConsumerContractNegotiationManagerImpl() { }

public void start(ContractNegotiationStore store) {
negotiationStore = store;
public void start() {
active.set(true);
executor = Executors.newSingleThreadExecutor();
executor.submit(this::run);
Expand Down Expand Up @@ -300,6 +296,7 @@ private CompletableFuture<Object> sendOffer(ContractOffer offer, ContractNegotia
private boolean processInitial(ContractNegotiation negotiation) {
var offer = negotiation.getLastContractOffer();
negotiation.transitionRequesting();
negotiation.lock();
negotiationStore.save(negotiation);

sendOffer(offer, negotiation, ContractOfferRequest.Type.INITIAL)
Expand Down Expand Up @@ -341,21 +338,24 @@ private BiConsumer<Object, Throwable> onOfferSent(String id, ContractOffer offer
* @return true if processed, false elsewhere
*/
private boolean processConsumerOffering(ContractNegotiation negotiation) {
negotiation.lock();
negotiationStore.save(negotiation);
var offer = negotiation.getLastContractOffer();
sendOffer(offer, negotiation, ContractOfferRequest.Type.COUNTER_OFFER)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionOffered();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerOffered(negotiation));
contractNegotiation.transitionOffered();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerOffered(contractNegotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionOffering();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerOffering(negotiation));
contractNegotiation.transitionOffering();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerOffering(contractNegotiation));
String message = format("[Consumer] Failed to send contract offer with id %s. ContractNegotiation %s stays in state %s.",
offer.getId(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
offer.getId(), contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand Down Expand Up @@ -401,21 +401,24 @@ private Boolean processConsumerApproving(ContractNegotiation negotiation) {
.correlationId(negotiation.getId())
.build();

negotiation.lock();
negotiationStore.save(negotiation);
// TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, request, negotiation::getId)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionApproved();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerApproved(negotiation));
contractNegotiation.transitionApproved();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerApproved(contractNegotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionApproving();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerApproving(negotiation));
contractNegotiation.transitionApproving();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerApproving(contractNegotiation));
String message = format("[Consumer] Failed to send contract agreement with id %s. ContractNegotiation %s stays in state %s.",
agreement.getId(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
agreement.getId(), contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand All @@ -438,21 +441,24 @@ private boolean processDeclining(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.build();

negotiation.lock();
negotiationStore.save(negotiation);
// TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, rejection, negotiation::getId)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionDeclined();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declined(negotiation));
contractNegotiation.transitionDeclined();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declined(contractNegotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionDeclining();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declining(negotiation));
contractNegotiation.transitionDeclining();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declining(contractNegotiation));
String message = format("[Consumer] Failed to send contract rejection. ContractNegotiation %s stays in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand Down Expand Up @@ -564,16 +570,21 @@ public Builder observable(ContractNegotiationObservable observable) {
return this;
}

public Builder store(ContractNegotiationStore store) {
manager.negotiationStore = store;
return this;
}

public ConsumerContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry");
Objects.requireNonNull(manager.commandQueue, "commandQueue");
Objects.requireNonNull(manager.commandRunner, "commandRunner");
Objects.requireNonNull(manager.observable, "observable");

manager.commandProcessor = new CommandProcessor<>(manager.commandQueue, manager.commandRunner, manager.monitor);

return manager;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class ProviderContractNegotiationManagerImpl implements ProviderContractN
private CommandRunner<ContractNegotiationCommand> commandRunner;
private CommandProcessor<ContractNegotiationCommand> commandProcessor;
private Monitor monitor;
private Predicate<Boolean> isProcessed = it -> it;

private ProviderContractNegotiationManagerImpl() {
}
Expand All @@ -84,8 +83,7 @@ private ProviderContractNegotiationManagerImpl() {

//TODO validate previous offers against hash?

public void start(ContractNegotiationStore negotiationStore) {
this.negotiationStore = negotiationStore;
public void start() {
active.set(true);
executor = Executors.newSingleThreadExecutor();
executor.submit(this::run);
Expand Down Expand Up @@ -273,9 +271,9 @@ private void run() {
while (active.get()) {
try {
long providerOffering = onNegotiationsInState(PROVIDER_OFFERING).doProcess(this::processProviderOffering);
long declining = onNegotiationsInState(DECLINING).doProcess(this::processDeclining);
long confirming = onNegotiationsInState(CONFIRMING).doProcess(this::processConfirming);

long declining = onNegotiationsInState(DECLINING).doProcess(this::processDeclining);

long commandsProcessed = onCommands().doProcess(this::processCommand);

var totalProcessed = providerOffering + declining + confirming + commandsProcessed;
Expand Down Expand Up @@ -333,21 +331,25 @@ private boolean processProviderOffering(ContractNegotiation negotiation) {
.correlationId(negotiation.getCorrelationId())
.build();

negotiation.lock();
negotiationStore.save(negotiation);

//TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, contractOfferRequest, () -> null)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionOffered();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.providerOffered(negotiation));
contractNegotiation.transitionOffered();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.providerOffered(contractNegotiation));
monitor.debug(String.format("[Provider] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionOffering();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.providerOffering(negotiation));
contractNegotiation.transitionOffering();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.providerOffering(contractNegotiation));
String message = format("[Provider] Failed to send contract offer with id %s. ContractNegotiation %s stays in state %s.",
currentOffer.getId(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
currentOffer.getId(), contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand All @@ -370,21 +372,25 @@ private boolean processDeclining(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.build();

negotiation.lock();
negotiationStore.save(negotiation);

//TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, rejection, () -> null)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionDeclined();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declined(negotiation));
contractNegotiation.transitionDeclined();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declined(contractNegotiation));
monitor.debug(String.format("[Provider] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionDeclining();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declining(negotiation));
contractNegotiation.transitionDeclining();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declining(contractNegotiation));
String message = format("[Provider] Failed to send contract rejection. ContractNegotiation %s stays in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand Down Expand Up @@ -438,6 +444,7 @@ private Boolean processConfirming(ContractNegotiation negotiation) {

//TODO protocol-independent response type?
negotiation.transitionConfirmingSent();
negotiation.lock();
negotiationStore.save(negotiation);
dispatcherRegistry.send(Object.class, request, () -> null)
.whenComplete(onAgreementSent(negotiation.getId(), agreement));
Expand Down Expand Up @@ -525,16 +532,22 @@ public Builder observable(ContractNegotiationObservable observable) {
return this;
}

public Builder store(ContractNegotiationStore store) {
manager.negotiationStore = store;
return this;
}

public ProviderContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry");
Objects.requireNonNull(manager.commandQueue, "commandQueue");
Objects.requireNonNull(manager.commandRunner, "commandRunner");
Objects.requireNonNull(manager.observable, "observable");

Objects.requireNonNull(manager.negotiationStore, "store");

manager.commandProcessor = new CommandProcessor<>(manager.commandQueue, manager.commandRunner, manager.monitor);

return manager;
}
}
Expand Down
Loading

0 comments on commit 30b131a

Please sign in to comment.