Skip to content

Commit

Permalink
UserTransaction should fire context lifecycle events
Browse files Browse the repository at this point in the history
- such as @initialized(TransactionScoped.class)
- resolves #28709
  • Loading branch information
mkouba committed Oct 25, 2022
1 parent 84f7bea commit 699afc8
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
import io.quarkus.deployment.logging.LogCleanupFilterBuildItem;
import io.quarkus.gizmo.ClassCreator;
import io.quarkus.narayana.jta.runtime.CDIDelegatingTransactionManager;
import io.quarkus.narayana.jta.runtime.NarayanaJtaProducers;
import io.quarkus.narayana.jta.runtime.NarayanaJtaRecorder;
import io.quarkus.narayana.jta.runtime.TransactionManagerConfiguration;
Expand Down Expand Up @@ -85,7 +84,6 @@ public void build(NarayanaJtaRecorder recorder,
recorder.handleShutdown(shutdownContextBuildItem, transactions);
feature.produce(new FeatureBuildItem(Feature.NARAYANA_JTA));
additionalBeans.produce(new AdditionalBeanBuildItem(NarayanaJtaProducers.class));
additionalBeans.produce(new AdditionalBeanBuildItem(CDIDelegatingTransactionManager.class));
additionalBeans.produce(AdditionalBeanBuildItem.unremovableOf("io.quarkus.narayana.jta.RequestScopedTransaction"));

runtimeInit.produce(new RuntimeInitializedClassBuildItem(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import javax.enterprise.inject.Produces;
import javax.inject.Singleton;
import javax.transaction.TransactionSynchronizationRegistry;
import javax.transaction.UserTransaction;

import org.jboss.tm.JBossXATerminator;
import org.jboss.tm.XAResourceRecoveryRegistry;
Expand All @@ -13,7 +14,6 @@
import com.arjuna.ats.internal.jbossatx.jta.jca.XATerminator;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionSynchronizationRegistryImple;
import com.arjuna.ats.jbossatx.jta.RecoveryManagerService;
import com.arjuna.ats.jta.UserTransaction;

import io.quarkus.arc.Unremovable;

Expand All @@ -28,8 +28,15 @@ public UserTransactionRegistry userTransactionRegistry() {

@Produces
@ApplicationScoped
public javax.transaction.UserTransaction userTransaction() {
return UserTransaction.userTransaction();
public UserTransaction userTransaction() {
return new NotifyingUserTransaction(com.arjuna.ats.jta.UserTransaction.userTransaction());
}

@Produces
@Unremovable
@Singleton
public javax.transaction.TransactionManager transactionManager() {
return new NotifyingTransactionManager();
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
Expand All @@ -20,51 +18,22 @@

import org.jboss.logging.Logger;

import io.quarkus.arc.Unremovable;
import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple;

/**
* A delegating transaction manager which receives an instance of Narayana transaction manager
* and delegates all calls to it.
* On top of it the implementation adds the CDI events processing for {@link TransactionScoped}.
*/
@Singleton
@Unremovable // used by Arc for transactional observers
public class CDIDelegatingTransactionManager implements TransactionManager, Serializable {

private static final Logger log = Logger.getLogger(CDIDelegatingTransactionManager.class);
public class NotifyingTransactionManager extends TransactionScopedNotifier implements TransactionManager, Serializable {

private static final long serialVersionUID = 1598L;

private final transient com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple delegate;

/**
* An {@link Event} that can {@linkplain Event#fire(Object) fire}
* {@link Transaction}s when the {@linkplain TransactionScoped transaction scope} is initialized.
*/
@Inject
@Initialized(TransactionScoped.class)
Event<Transaction> transactionScopeInitialized;
private static final Logger LOG = Logger.getLogger(NotifyingTransactionManager.class);

/**
* An {@link Event} that can {@linkplain Event#fire(Object) fire}
* {@link Object}s before the {@linkplain TransactionScoped transaction scope} is destroyed.
*/
@Inject
@BeforeDestroyed(TransactionScoped.class)
Event<Object> transactionScopeBeforeDestroyed;
private transient com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple delegate;

/**
* An {@link Event} that can {@linkplain Event#fire(Object) fire}
* {@link Object}s when the {@linkplain TransactionScoped transaction scope} is destroyed.
*/
@Inject
@Destroyed(TransactionScoped.class)
Event<Object> transactionScopeDestroyed;

/**
* Delegating transaction manager call to com.arjuna.ats.jta.{@link com.arjuna.ats.jta.TransactionManager}
*/
public CDIDelegatingTransactionManager() {
NotifyingTransactionManager() {
delegate = (com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple) com.arjuna.ats.jta.TransactionManager
.transactionManager();
}
Expand All @@ -80,9 +49,7 @@ public CDIDelegatingTransactionManager() {
@Override
public void begin() throws NotSupportedException, SystemException {
delegate.begin();
if (this.transactionScopeInitialized != null) {
this.transactionScopeInitialized.fire(this.getTransaction());
}
initialized(TransactionImple.getTransaction().toString());
}

/**
Expand All @@ -97,16 +64,12 @@ public void begin() throws NotSupportedException, SystemException {
@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException,
IllegalStateException, SystemException {
if (this.transactionScopeBeforeDestroyed != null) {
this.transactionScopeBeforeDestroyed.fire(this.getTransaction());
}

String id = TransactionImple.getTransaction().toString();
beforeDestroyed(id);
try {
delegate.commit();
} finally {
if (this.transactionScopeDestroyed != null) {
this.transactionScopeDestroyed.fire(this.toString());
}
destroyed(id);
}
}

Expand All @@ -121,21 +84,17 @@ public void commit() throws RollbackException, HeuristicMixedException, Heuristi
*/
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
String id = TransactionImple.getTransaction().toString();
try {
if (this.transactionScopeBeforeDestroyed != null) {
this.transactionScopeBeforeDestroyed.fire(this.getTransaction());
}
beforeDestroyed(id);
} catch (Throwable t) {
log.error("Failed to fire @BeforeDestroyed(TransactionScoped.class)", t);
LOG.error("Failed to fire @BeforeDestroyed(TransactionScoped.class)", t);
}

try {
delegate.rollback();
} finally {
//we don't need a catch block here, if this one fails we just let the exception propagate
if (this.transactionScopeDestroyed != null) {
this.transactionScopeDestroyed.fire(this.toString());
}
destroyed(id);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.quarkus.narayana.jta.runtime;

import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.jboss.logging.Logger;

import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImple;

public class NotifyingUserTransaction extends TransactionScopedNotifier implements UserTransaction {

private static final Logger LOG = Logger.getLogger(NotifyingUserTransaction.class);

private final UserTransaction delegate;

public NotifyingUserTransaction(UserTransaction delegate) {
this.delegate = delegate;
}

@Override
public void begin() throws NotSupportedException, SystemException {
delegate.begin();
initialized(TransactionImple.getTransaction().toString());
}

@Override
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException,
IllegalStateException, SystemException {
String id = TransactionImple.getTransaction().toString();
beforeDestroyed(id);
try {
delegate.commit();
} finally {
destroyed(id);
}
}

@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
String id = TransactionImple.getTransaction().toString();
try {
beforeDestroyed(id);
} catch (Throwable t) {
LOG.error("Failed to fire @BeforeDestroyed(TransactionScoped.class)", t);
}
try {
delegate.rollback();
} finally {
destroyed(id);
}
}

@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
delegate.setRollbackOnly();
}

@Override
public int getStatus() throws SystemException {
return delegate.getStatus();
}

@Override
public void setTransactionTimeout(int seconds) throws SystemException {
delegate.setTransactionTimeout(seconds);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.quarkus.narayana.jta.runtime;

import java.util.Objects;

import javax.enterprise.context.BeforeDestroyed;
import javax.enterprise.context.Destroyed;
import javax.enterprise.context.Initialized;
import javax.enterprise.event.Event;
import javax.transaction.TransactionScoped;

import io.quarkus.arc.Arc;

public abstract class TransactionScopedNotifier {

private transient Event<TransactionId> initialized;
private transient Event<TransactionId> beforeDestroyed;
private transient Event<TransactionId> destroyed;

void initialized(String transactionId) {
if (initialized == null) {
initialized = Arc.container().beanManager().getEvent()
.select(TransactionId.class, Initialized.Literal.of(TransactionScoped.class));
}
initialized.fire(new TransactionId(transactionId));
}

void beforeDestroyed(String transactionId) {
if (beforeDestroyed == null) {
beforeDestroyed = Arc.container().beanManager().getEvent()
.select(TransactionId.class, BeforeDestroyed.Literal.of(TransactionScoped.class));
}
beforeDestroyed.fire(new TransactionId(transactionId));
}

void destroyed(String transactionId) {
if (destroyed == null) {
destroyed = Arc.container().beanManager().getEvent()
.select(TransactionId.class, Destroyed.Literal.of(TransactionScoped.class));
}
destroyed.fire(new TransactionId(transactionId));
}

// we use this wrapper because if we fire an event with string payload then any "@Observes String payload" would be notified
public static final class TransactionId {

private final String value;

public TransactionId(String value) {
this.value = value;
}

@Override
public String toString() {
return value;
}

@Override
public int hashCode() {
return Objects.hash(value);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
TransactionId other = (TransactionId) obj;
return Objects.equals(value, other.value);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.arjuna.ats.jta.logging.jtaLogger;

import io.quarkus.arc.runtime.InterceptorBindings;
import io.quarkus.narayana.jta.runtime.CDIDelegatingTransactionManager;
import io.quarkus.narayana.jta.runtime.NotifyingTransactionManager;
import io.quarkus.narayana.jta.runtime.TransactionConfiguration;
import io.quarkus.transaction.annotations.Rollback;
import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -110,7 +110,7 @@ protected Object invokeInOurTx(InvocationContext ic, TransactionManager tm, Runn

int timeoutConfiguredForMethod = getTransactionTimeoutFromAnnotation(ic);

int currentTmTimeout = ((CDIDelegatingTransactionManager) transactionManager).getTransactionTimeout();
int currentTmTimeout = ((NotifyingTransactionManager) transactionManager).getTransactionTimeout();

if (timeoutConfiguredForMethod > 0) {
tm.setTransactionTimeout(timeoutConfiguredForMethod);
Expand Down
Loading

0 comments on commit 699afc8

Please sign in to comment.