Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release connection on reactive beginTransaction cancellation #1342

Merged
merged 1 commit into from
Dec 1, 2022

Conversation

injectives
Copy link
Contributor

@injectives injectives commented Dec 1, 2022

Cherry-pick: #1341

Each transaction created by the driver requires a network connection. Unfinished transactions may result in connection leaks, meaning that connections acquired from the connection pool are not available for further use.

Subscription cancellation on reactive beginTransaction during transaction creation could result in dangling transaction, leading to the connection leak. This update fixes this issue by ensuring that such transactions are rolled back and their connections are returned to the connection pool.

Each transaction created by the driver requires a network connection. Unfinished transactions may result in connection leaks, meaning that connections acquired from the connection pool are not available for further use.

Subscription cancellation on reactive `beginTransaction` during transaction creation could result in dangling transaction, leading to the connection leak. This update fixes this issue by ensuring that such transactions are rolled back and their connections are returned to the connection pool.
@michael-simons
Copy link
Contributor

So I testet the following code against 4.4.9 first

package org.springframework.data.neo4j.integration.x;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Query;
import org.neo4j.driver.Session;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactive.RxSession;
import org.neo4j.driver.reactive.RxTransaction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.neo4j.config.AbstractReactiveNeo4jConfig;
import org.springframework.data.neo4j.repository.config.EnableReactiveNeo4jRepositories;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

@ExtendWith(SpringExtension.class)
@SpringJUnitConfig(FooTest.Config.class)
public class FooTest {

	@Test
	void f(@Autowired MovieRepository movieRepository, @Autowired Driver driver) throws InterruptedException {
		UUID id = UUID.randomUUID();
		Flux
				.range(1, 5)
				// .doOnCancel(() -> System.out.println("doOnCancel (outer)"))
				.flatMap(
						i -> movieRepository
								.findById(id)
								.switchIfEmpty(Mono.error(new RuntimeException()))
						//.doOnCancel(() -> System.out.println("doOnCancel"))
				)
				//.collectList()
				//.as(TransactionalOperator.create(transactionManager)::transactional)
				.then()
				.as(StepVerifier::create)
				.verifyError();
		System.out.println("---- complete");
		try (Session session = driver.session()) {
			System.out.println(session.run("RETURN 1").single().get(0));
		}
	}


	@Test
	void f(@Autowired Driver driver) throws InterruptedException {
		UUID id = UUID.randomUUID();
		Flux
				.range(1, 5)
				// .doOnCancel(() -> System.out.println("doOnCancel (outer)"))
				.flatMap(
						i -> {
							Query query = new Query("MATCH (p:Product) WHERE p.id = $id RETURN p.title", Collections.singletonMap("id", 0));
							return Flux.usingWhen(
											Mono.fromSupplier(() -> driver.rxSession()),
											session -> Mono.fromSupplier(() -> session.run(query))
													.flatMapMany(result -> Flux.from(result.records()))
													.map(record -> record.get(0).asString()),
											session -> Mono.fromDirect(session.close()))
									.switchIfEmpty(Mono.error(new RuntimeException()));
						}
						//.doOnCancel(() -> System.out.println("doOnCancel"))
				)
				//.collectList()
				//.as(TransactionalOperator.create(transactionManager)::transactional)
				.then()
				.as(StepVerifier::create)
				.verifyError();
		System.out.println("---- complete");
		// Thread.currentThread().join();
		try (Session session = driver.session()) {
			System.out.println(session.run("RETURN 1").single().get(0));
		}
	}

	static class  SessionAndTx {
		RxSession session; RxTransaction tx;


		public SessionAndTx(RxSession session, RxTransaction tx) {
			this.session = session;
			this.tx = tx;
		}
	}

	@Test
	void f2(@Autowired Driver driver) {
		Flux
				.range(1, 5)
				.flatMap(
						i -> {
							Mono<SessionAndTx> f = Mono
									.just(driver.rxSession())
									.flatMap(s -> Mono.fromDirect(s.beginTransaction()).map(tx -> new SessionAndTx(s, tx)));
							return Flux.usingWhen(f,
									h -> Mono.fromSupplier(() -> h.tx.run("MATCH (n) WHERE false = true RETURN n")).flatMapMany(RxResult::records),
									h -> Mono.from(h.tx.commit()).then(Mono.from(h.session.close())),
									(h, e) -> Mono.from(h.tx.rollback()).then(Mono.from(h.session.close())),
									h -> Mono.from(h.tx.rollback()).then(Mono.from(h.session.close()))
							).switchIfEmpty(Mono.error(new RuntimeException()));
						}
				)
				.then()
				.as(StepVerifier::create)
				.verifyError();
		System.out.println("---- complete");
		try (Session session = driver.session()) {
			System.out.println(session.run("RETURN 1").single().get(0));
		}
	}

	@Configuration
	@EnableTransactionManagement
	@EnableReactiveNeo4jRepositories(considerNestedRepositories = true)
	static class Config extends AbstractReactiveNeo4jConfig {

		@Bean
		public Driver driver() {
			org.neo4j.driver.Config config = org.neo4j.driver.Config.builder()
					.withMaxConnectionPoolSize(2)
					.withConnectionAcquisitionTimeout(20, TimeUnit.SECONDS)
					.withLeakedSessionsLogging()
					.build();
			return
					GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "secret"), config);
		}
	}

}

Interesting here is that while in 5.x all variants fail (Unmanaged tx, which is what SDN is using, auto commit and tx functions), in 4.4 only unmanaged tx and tx function fail).

With the 4.4-SNAPSHOT, all works.

Excellent work, thank you!

ref: https://github.com/spring-projects/spring-data-neo4j/tree/issue/2632

@injectives injectives merged commit 44f29ca into neo4j:4.4 Dec 1, 2022
@injectives injectives deleted the feature/btx44 branch December 1, 2022 11:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants