Skip to content

Commit

Permalink
Lock entities before calling an async operation
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Feb 18, 2022
1 parent 1f62003 commit 4830726
Show file tree
Hide file tree
Showing 32 changed files with 735 additions and 542 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public void initialize(ServiceExtensionContext context) {

@Override
public void start() {
consumerNegotiationManager.start(store);
providerNegotiationManager.start(store);
consumerNegotiationManager.start();
providerNegotiationManager.start();
}

@Override
Expand Down Expand Up @@ -140,6 +140,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandRunner(commandRunner)
.observable(observable)
.telemetry(telemetry)
.store(store)
.build();

providerNegotiationManager = ProviderContractNegotiationManagerImpl.Builder.newInstance()
Expand All @@ -151,6 +152,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandRunner(commandRunner)
.observable(observable)
.telemetry(telemetry)
.store(store)
.build();

context.registerService(ConsumerContractNegotiationManager.class, consumerNegotiationManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ public class ConsumerContractNegotiationManagerImpl implements ConsumerContractN

private ConsumerContractNegotiationManagerImpl() { }

public void start(ContractNegotiationStore store) {
negotiationStore = store;
public void start() {
active.set(true);
executor = Executors.newSingleThreadExecutor();
executor.submit(this::run);
Expand Down Expand Up @@ -383,6 +382,7 @@ private BiConsumer<Object, Throwable> onCounterOfferSent(String negotiationId, S
monitor.debug(message, throwable);
}
};

}

/**
Expand Down Expand Up @@ -424,6 +424,7 @@ private boolean processConsumerApproving(ContractNegotiation negotiation) {
.correlationId(negotiation.getId())
.build();

negotiationStore.save(negotiation);
// TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, request, negotiation::getId)
.whenComplete(onAgreementSent(negotiation.getId(), agreement.getId()));
Expand Down Expand Up @@ -474,9 +475,11 @@ private boolean processDeclining(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.build();

negotiationStore.save(negotiation);
// TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, rejection, negotiation::getId)
.whenComplete(onRejectionSent(negotiation.getId()));

return false;
}

Expand Down Expand Up @@ -617,6 +620,11 @@ public Builder observable(ContractNegotiationObservable observable) {
return this;
}

public Builder store(ContractNegotiationStore store) {
manager.negotiationStore = store;
return this;
}

public ConsumerContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ private ProviderContractNegotiationManagerImpl() {

//TODO validate previous offers against hash?

public void start(ContractNegotiationStore store) {
negotiationStore = store;
public void start() {
active.set(true);
executor = Executors.newSingleThreadExecutor();
executor.submit(this::run);
Expand Down Expand Up @@ -282,6 +281,7 @@ private void run() {
long providerOffering = processNegotiationsInState(PROVIDER_OFFERING, this::processProviderOffering);
long declining = processNegotiationsInState(DECLINING, this::processDeclining);
long confirming = processNegotiationsInState(CONFIRMING, this::processConfirming);

long commandsProcessed = onCommands().doProcess(this::processCommand);

var totalProcessed = providerOffering + declining + confirming + commandsProcessed;
Expand Down Expand Up @@ -341,6 +341,8 @@ private boolean processProviderOffering(ContractNegotiation negotiation) {
.correlationId(negotiation.getCorrelationId())
.build();

negotiationStore.save(negotiation);

//TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, contractOfferRequest, () -> null)
.whenComplete(onCounterOfferSent(negotiation.getId(), currentOffer.getId()));
Expand Down Expand Up @@ -390,6 +392,8 @@ private boolean processDeclining(ContractNegotiation negotiation) {
.rejectionReason(negotiation.getErrorDetail())
.build();

negotiationStore.save(negotiation);

//TODO protocol-independent response type?
dispatcherRegistry.send(Object.class, rejection, () -> null)
.whenComplete(onRejectionSent(negotiation.getId()));
Expand Down Expand Up @@ -563,6 +567,11 @@ public Builder observable(ContractNegotiationObservable observable) {
return this;
}

public Builder store(ContractNegotiationStore store) {
manager.negotiationStore = store;
return this;
}

public ProviderContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Expand All @@ -571,6 +580,8 @@ public ProviderContractNegotiationManagerImpl build() {
Objects.requireNonNull(manager.commandRunner, "commandRunner");
Objects.requireNonNull(manager.observable, "observable");
Objects.requireNonNull(manager.telemetry, "telemetry");
Objects.requireNonNull(manager.negotiationStore, "store");

manager.commandProcessor = new CommandProcessor<>(manager.commandQueue, manager.commandRunner, manager.monitor);

return manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@
/**
* Setup for the contract negotiation integration test.
*/
public abstract class AbstractContractNegotiationIntegrationTest {
public abstract class AbstractContractNegotiationManagerIntegrationTest {

protected ProviderContractNegotiationManagerImpl providerManager;
protected ConsumerContractNegotiationManagerImpl consumerManager;

protected ContractNegotiationObservable providerObservable = new ContractNegotiationObservableImpl();
protected ContractNegotiationObservable consumerObservable = new ContractNegotiationObservableImpl();

protected InMemoryContractNegotiationStore providerStore;
protected InMemoryContractNegotiationStore consumerStore;
protected InMemoryContractNegotiationStore providerStore = new InMemoryContractNegotiationStore();
protected InMemoryContractNegotiationStore consumerStore = new InMemoryContractNegotiationStore();

protected ContractValidationService validationService;

Expand Down Expand Up @@ -92,7 +92,6 @@ void setUp() {
// Create CommandRunner mock
CommandRunner<ContractNegotiationCommand> runner = (CommandRunner<ContractNegotiationCommand>) mock(CommandRunner.class);

// Create the provider contract negotiation manager
providerManager = ProviderContractNegotiationManagerImpl.Builder.newInstance()
.dispatcherRegistry(new FakeProviderDispatcherRegistry())
.monitor(monitor)
Expand All @@ -101,10 +100,9 @@ void setUp() {
.commandQueue(queue)
.commandRunner(runner)
.observable(providerObservable)
.store(providerStore)
.build();
providerStore = new InMemoryContractNegotiationStore();

// Create the consumer contract negotiation manager
consumerManager = ConsumerContractNegotiationManagerImpl.Builder.newInstance()
.dispatcherRegistry(new FakeConsumerDispatcherRegistry())
.monitor(monitor)
Expand All @@ -113,9 +111,9 @@ void setUp() {
.commandQueue(queue)
.commandRunner(runner)
.observable(consumerObservable)
.store(consumerStore)
.build();
consumerStore = new InMemoryContractNegotiationStore();


countDownLatch = new CountDownLatch(2);
}

Expand Down
Loading

0 comments on commit 4830726

Please sign in to comment.