Skip to content

Commit

Permalink
Lock entities before calling an async operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Feb 17, 2022
1 parent 54aade6 commit ffef89b
Show file tree
Hide file tree
Showing 28 changed files with 734 additions and 509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public class ContractServiceExtension implements ServiceExtension {

private static final long DEFAULT_ITERATION_WAIT = 5000; // millis
private Monitor monitor;
private ServiceExtensionContext context;
private ConsumerContractNegotiationManagerImpl consumerNegotiationManager;
private ProviderContractNegotiationManagerImpl providerNegotiationManager;
@Inject
Expand All @@ -68,6 +67,8 @@ public class ContractServiceExtension implements ServiceExtension {
private RemoteMessageDispatcherRegistry dispatcherRegistry;
@Inject
private CommandHandlerRegistry commandHandlerRegistry;
@Inject
private ContractNegotiationStore store;

@Override
public String name() {
Expand All @@ -77,18 +78,15 @@ public String name() {
@Override
public void initialize(ServiceExtensionContext context) {
monitor = context.getMonitor();
this.context = context;

registerTypes(context);
registerServices(context);
}

@Override
public void start() {
// Start negotiation managers.
var negotiationStore = context.getService(ContractNegotiationStore.class);
consumerNegotiationManager.start(negotiationStore);
providerNegotiationManager.start(negotiationStore);
consumerNegotiationManager.start();
providerNegotiationManager.start();
}

@Override
Expand Down Expand Up @@ -141,6 +139,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandRunner(commandRunner)
.observable(observable)
.telemetry(telemetry)
.store(store)
.build();

providerNegotiationManager = ProviderContractNegotiationManagerImpl.Builder.newInstance()
Expand All @@ -152,6 +151,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandRunner(commandRunner)
.observable(observable)
.telemetry(telemetry)
.store(store)
.build();

context.registerService(ConsumerContractNegotiationManager.class, consumerNegotiationManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,9 @@ public class ConsumerContractNegotiationManagerImpl implements ConsumerContractN
private Telemetry telemetry;
private Monitor monitor;

public ConsumerContractNegotiationManagerImpl() {
}
private ConsumerContractNegotiationManagerImpl() { }

public void start(ContractNegotiationStore store) {
negotiationStore = store;
public void start() {
active.set(true);
executor = Executors.newSingleThreadExecutor();
executor.submit(this::run);
Expand Down Expand Up @@ -350,21 +348,23 @@ private BiConsumer<Object, Throwable> onOfferSent(String id, ContractOffer offer
*/
@WithSpan
private boolean processConsumerOffering(ContractNegotiation negotiation) {
negotiationStore.save(negotiation);
var offer = negotiation.getLastContractOffer();
sendOffer(offer, negotiation, ContractOfferRequest.Type.COUNTER_OFFER)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionOffered();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerOffered(negotiation));
contractNegotiation.transitionOffered();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerOffered(contractNegotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionOffering();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerOffering(negotiation));
contractNegotiation.transitionOffering();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerOffering(contractNegotiation));
String message = format("[Consumer] Failed to send contract offer with id %s. ContractNegotiation %s stays in state %s.",
offer.getId(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
offer.getId(), contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand Down Expand Up @@ -410,21 +410,23 @@ private boolean processConsumerApproving(ContractNegotiation negotiation) {
.correlationId(negotiation.getId())
.build();

negotiationStore.save(negotiation);
// TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, request, negotiation::getId)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionApproved();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerApproved(negotiation));
contractNegotiation.transitionApproved();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerApproved(contractNegotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionApproving();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.consumerApproving(negotiation));
contractNegotiation.transitionApproving();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.consumerApproving(contractNegotiation));
String message = format("[Consumer] Failed to send contract agreement with id %s. ContractNegotiation %s stays in state %s.",
agreement.getId(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
agreement.getId(), contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand All @@ -448,21 +450,23 @@ private boolean processDeclining(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.build();

negotiationStore.save(negotiation);
// TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, rejection, negotiation::getId)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionDeclined();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declined(negotiation));
contractNegotiation.transitionDeclined();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declined(contractNegotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionDeclining();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declining(negotiation));
contractNegotiation.transitionDeclining();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declining(contractNegotiation));
String message = format("[Consumer] Failed to send contract rejection. ContractNegotiation %s stays in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand Down Expand Up @@ -580,6 +584,11 @@ public Builder observable(ContractNegotiationObservable observable) {
return this;
}

public Builder store(ContractNegotiationStore store) {
manager.negotiationStore = store;
return this;
}

public ConsumerContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ private ProviderContractNegotiationManagerImpl() {

//TODO validate previous offers against hash?

public void start(ContractNegotiationStore negotiationStore) {
this.negotiationStore = negotiationStore;
public void start() {
active.set(true);
executor = Executors.newSingleThreadExecutor();
executor.submit(this::run);
Expand Down Expand Up @@ -281,6 +280,7 @@ private void run() {
long providerOffering = processNegotiationsInState(PROVIDER_OFFERING, this::processProviderOffering);
long declining = processNegotiationsInState(DECLINING, this::processDeclining);
long confirming = processNegotiationsInState(CONFIRMING, this::processConfirming);

long commandsProcessed = onCommands().doProcess(this::processCommand);

var totalProcessed = providerOffering + declining + confirming + commandsProcessed;
Expand Down Expand Up @@ -340,21 +340,24 @@ private boolean processProviderOffering(ContractNegotiation negotiation) {
.correlationId(negotiation.getCorrelationId())
.build();

negotiationStore.save(negotiation);

//TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, contractOfferRequest, () -> null)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionOffered();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.providerOffered(negotiation));
contractNegotiation.transitionOffered();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.providerOffered(contractNegotiation));
monitor.debug(String.format("[Provider] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionOffering();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.providerOffering(negotiation));
contractNegotiation.transitionOffering();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.providerOffering(contractNegotiation));
String message = format("[Provider] Failed to send contract offer with id %s. ContractNegotiation %s stays in state %s.",
currentOffer.getId(), negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
currentOffer.getId(), contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand All @@ -378,21 +381,24 @@ private boolean processDeclining(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.build();

negotiationStore.save(negotiation);

//TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, rejection, () -> null)
.whenComplete((response, throwable) -> {
var contractNegotiation = negotiationStore.find(negotiation.getId());
if (throwable == null) {
negotiation.transitionDeclined();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declined(negotiation));
contractNegotiation.transitionDeclined();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declined(contractNegotiation));
monitor.debug(String.format("[Provider] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState())));
} else {
negotiation.transitionDeclining();
negotiationStore.save(negotiation);
observable.invokeForEach(l -> l.declining(negotiation));
contractNegotiation.transitionDeclining();
negotiationStore.save(contractNegotiation);
observable.invokeForEach(l -> l.declining(contractNegotiation));
String message = format("[Provider] Failed to send contract rejection. ContractNegotiation %s stays in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState()));
contractNegotiation.getId(), ContractNegotiationStates.from(contractNegotiation.getState()));
monitor.debug(message, throwable);
}
});
Expand Down Expand Up @@ -539,6 +545,11 @@ public Builder observable(ContractNegotiationObservable observable) {
return this;
}

public Builder store(ContractNegotiationStore store) {
manager.negotiationStore = store;
return this;
}

public ProviderContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Expand All @@ -547,6 +558,8 @@ public ProviderContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.commandRunner, "commandRunner");
Objects.requireNonNull(manager.observable, "observable");
Objects.requireNonNull(manager.telemetry, "telemetry");
Objects.requireNonNull(manager.negotiationStore, "store");

manager.commandProcessor = new CommandProcessor<>(manager.commandQueue, manager.commandRunner, manager.monitor);

return manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@
/**
* Setup for the contract negotiation integration test.
*/
public abstract class AbstractContractNegotiationIntegrationTest {
public abstract class AbstractContractNegotiationManagerIntegrationTest {

protected ProviderContractNegotiationManagerImpl providerManager;
protected ConsumerContractNegotiationManagerImpl consumerManager;

protected ContractNegotiationObservable providerObservable = new ContractNegotiationObservableImpl();
protected ContractNegotiationObservable consumerObservable = new ContractNegotiationObservableImpl();

protected InMemoryContractNegotiationStore providerStore;
protected InMemoryContractNegotiationStore consumerStore;
protected InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore();
protected InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore();

protected ContractValidationService validationService;

Expand Down Expand Up @@ -92,7 +92,6 @@ void setUp() {
// Create CommandRunner mock
CommandRunner<ContractNegotiationCommand> runner = (CommandRunner<ContractNegotiationCommand>) mock(CommandRunner.class);

// Create the provider contract negotiation manager
providerManager = ProviderContractNegotiationManagerImpl.Builder.newInstance()
.dispatcherRegistry(new FakeProviderDispatcherRegistry())
.monitor(monitor)
Expand All @@ -101,10 +100,9 @@ void setUp() {
.commandQueue(queue)
.commandRunner(runner)
.observable(providerObservable)
.store(providerStore)
.build();
providerStore = new InMemoryContractNegotiationStore();

// Create the consumer contract negotiation manager
consumerManager = ConsumerContractNegotiationManagerImpl.Builder.newInstance()
.dispatcherRegistry(new FakeConsumerDispatcherRegistry())
.monitor(monitor)
Expand All @@ -113,9 +111,9 @@ void setUp() {
.commandQueue(queue)
.commandRunner(runner)
.observable(consumerObservable)
.store(consumerStore)
.build();
consumerStore = new InMemoryContractNegotiationStore();


countDownLatch = new CountDownLatch(2);
}

Expand Down
Loading

0 comments on commit ffef89b

Please sign in to comment.