Skip to content

Commit

Permalink
JVMCBC-1146: Add ConnectionString to core.
Browse files Browse the repository at this point in the history
This changeset stores the connection string in the Core instance
and exposes it in the creation event. Right now this change
doesn't do much, but it is needed for later changes to come around
DNS SRV-based config refresh.

Change-Id: Icd0616191a071fd0465733375bc69d25c4ad4cab
Reviewed-on: https://review.couchbase.org/c/couchbase-jvm-clients/+/179368
Tested-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
Reviewed-by: Michael Nitschinger <michael.nitschinger@couchbase.com>
  • Loading branch information
daschl authored and Michael Nitschinger committed Aug 31, 2022
1 parent 29f8130 commit 41a182f
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 52 deletions.
41 changes: 35 additions & 6 deletions core-io/src/main/java/com/couchbase/client/core/Core.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

import java.security.SecureRandom;
import java.time.Duration;
Expand Down Expand Up @@ -242,13 +243,36 @@ public class Core {
private final CoreTransactionsCleanup transactionsCleanup;

/**
* Creates a new {@link Core} with the given environment.
* Holds the user connection string if provided (null otherwise).
*/
@Nullable
private final String connectionString;

/**
* Creates a new {@link Core} with the given environment with no connection string.
*
* @param environment the environment for this core.
* @param authenticator the authenticator used for kv and http authentication.
* @param seedNodes the seed nodes to initially connect to.
* @return the created {@link Core}.
*/
public static Core create(final CoreEnvironment environment, final Authenticator authenticator, final Set<SeedNode> seedNodes) {
return new Core(environment, authenticator, seedNodes);
public static Core create(final CoreEnvironment environment, final Authenticator authenticator,
final Set<SeedNode> seedNodes) {
return new Core(environment, authenticator, seedNodes, null);
}

/**
* Creates a new {@link Core} with the given environment with no connection string.
*
* @param environment the environment for this core.
* @param authenticator the authenticator used for kv and http authentication.
* @param seedNodes the seed nodes to initially connect to.
* @param connectionString if provided, the original connection string from the user.
* @return the created {@link Core}.
*/
public static Core create(final CoreEnvironment environment, final Authenticator authenticator,
final Set<SeedNode> seedNodes, @Nullable final String connectionString) {
return new Core(environment, authenticator, seedNodes, connectionString);
}

/**
Expand Down Expand Up @@ -287,9 +311,13 @@ public static boolean getFailIfInstanceLimitReached() {
/**
* Creates a new Core.
*
* @param environment the environment for this core.
* @param environment the environment used.
* @param authenticator the authenticator used for kv and http authentication.
* @param seedNodes the seed nodes to initially connect to.
* @param connectionString if provided, the original connection string from the user.
*/
protected Core(final CoreEnvironment environment, final Authenticator authenticator, final Set<SeedNode> seedNodes) {
protected Core(final CoreEnvironment environment, final Authenticator authenticator, final Set<SeedNode> seedNodes,
@Nullable final String connectionString) {
if (environment.securityConfig().tlsEnabled() && !authenticator.supportsTls()) {
throw new InvalidArgumentException("TLS enabled but the Authenticator does not support TLS!", null, null);
} else if (!environment.securityConfig().tlsEnabled() && !authenticator.supportsNonTls()) {
Expand All @@ -298,6 +326,7 @@ protected Core(final CoreEnvironment environment, final Authenticator authentica

incrementAndVerifyNumInstances(environment.eventBus());

this.connectionString = connectionString;
this.seedNodes = seedNodes;
this.coreContext = new CoreContext(this, createInstanceId(), environment, authenticator);
this.configurationProvider = createConfigurationProvider();
Expand All @@ -320,7 +349,7 @@ protected Core(final CoreEnvironment environment, final Authenticator authentica
.map(c -> (BeforeSendRequestCallback) c)
.collect(Collectors.toList());

eventBus.publish(new CoreCreatedEvent(coreContext, environment, seedNodes, NUM_INSTANCES.get()));
eventBus.publish(new CoreCreatedEvent(coreContext, environment, seedNodes, NUM_INSTANCES.get(), connectionString));

long watchdogInterval = INVALID_STATE_WATCHDOG_INTERVAL.getSeconds();
if (watchdogInterval <= 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import com.couchbase.client.core.cnc.Context;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.SeedNode;
import reactor.util.annotation.Nullable;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -36,18 +36,15 @@
public class CoreCreatedEvent extends AbstractEvent {

private final CoreEnvironment environment;
private final int numCoreInstances;

public CoreCreatedEvent(final CoreContext context, final CoreEnvironment environment, final Set<SeedNode> seedNodes,
final int numCoreInstances) {
super(Severity.INFO, Category.CORE, Duration.ZERO, attachSeedNodes(context, seedNodes, numCoreInstances));
final int numCoreInstances, @Nullable final String connectionString) {
super(Severity.INFO, Category.CORE, Duration.ZERO, enrichContext(context, seedNodes, numCoreInstances, connectionString));
this.environment = environment;
this.numCoreInstances = numCoreInstances;

}

private static Context attachSeedNodes(final CoreContext context, final Set<SeedNode> seedNodes,
final int numCoreInstances) {
private static Context enrichContext(final CoreContext context, final Set<SeedNode> seedNodes,
final int numCoreInstances, @Nullable final String connectionString) {
return new CoreContext(context.core(), context.id(), context.environment(), context.authenticator()) {
@Override
public void injectExportableParams(final Map<String, Object> input) {
Expand All @@ -62,6 +59,9 @@ public void injectExportableParams(final Map<String, Object> input) {
}).collect(Collectors.toSet()));

input.put("numCoreInstances", numCoreInstances);
if (connectionString != null) {
input.put("connectionString", connectionString);
}
}
};
}
Expand Down
12 changes: 6 additions & 6 deletions core-io/src/test/java/com/couchbase/client/core/CoreTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void addNodesAndServicesOnNewConfig() {
final Map<String, Node> mocks = new HashMap<>();
mocks.put("10.143.190.101", mock101);
mocks.put("10.143.190.102", mock102);
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST) {
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST, null) {
@Override
public ConfigurationProvider createConfigurationProvider() {
return configProvider;
Expand Down Expand Up @@ -203,7 +203,7 @@ void addServicesOnNewConfig() {
final Map<String, Node> mocks = new HashMap<>();
mocks.put("10.143.190.101", mock101);
mocks.put("10.143.190.102", mock102);
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST) {
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST, null) {
@Override
public ConfigurationProvider createConfigurationProvider() {
return configProvider;
Expand Down Expand Up @@ -303,7 +303,7 @@ void removeNodesAndServicesOnNewConfig() {
final Map<String, Node> mocks = new HashMap<>();
mocks.put("10.143.190.101", mock101);
mocks.put("10.143.190.102", mock102);
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST) {
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST, null) {
@Override
public ConfigurationProvider createConfigurationProvider() {
return configProvider;
Expand Down Expand Up @@ -386,7 +386,7 @@ void removesNodeIfNotPresentInConfigAnymore() {
final Map<String, Node> mocks = new HashMap<>();
mocks.put("10.143.190.101", mock101);
mocks.put("10.143.190.102", mock102);
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST) {
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST, null) {
@Override
public ConfigurationProvider createConfigurationProvider() {
return configProvider;
Expand Down Expand Up @@ -473,7 +473,7 @@ void addsSecondNodeIfBothSameHostname() throws InterruptedException {
final Map<String, Node> mocks = new HashMap<>();
mocks.put("127.0.0.1:9000", mock101);
mocks.put("127.0.0.1:9001", mock102);
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST) {
new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST, null) {
@Override
public ConfigurationProvider createConfigurationProvider() {
return configProvider;
Expand Down Expand Up @@ -515,7 +515,7 @@ void ignoresFailedGlobalConfigInitAttempt() {
final ConfigurationProvider configProvider = mock(ConfigurationProvider.class);
when(configProvider.configs()).thenReturn(Flux.empty());

Core core = new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST) {
Core core = new Core(ENV, AUTHENTICATOR, SeedNode.LOCALHOST, null) {
@Override
public ConfigurationProvider createConfigurationProvider() {
return configProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public static AsyncCluster connect(final String connectionString, final ClusterO
return new AsyncCluster(
environmentSupplier,
opts.authenticator(),
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
seedNodesFromConnectionString(connectionString, environmentSupplier.get()),
connectionString
);
}

Expand All @@ -199,7 +200,8 @@ public static AsyncCluster connect(final Set<SeedNode> seedNodes, final ClusterO
notNull(options, "ClusterOptions");

final ClusterOptions.Built opts = options.build();
return new AsyncCluster(extractClusterEnvironment(asConnectionString(seedNodes), opts), opts.authenticator(), seedNodes);
return new AsyncCluster(extractClusterEnvironment(asConnectionString(seedNodes), opts), opts.authenticator(),
seedNodes, null);
}

/**
Expand Down Expand Up @@ -250,9 +252,10 @@ static Set<SeedNode> seedNodesFromConnectionString(final String cs, final Cluste
*
* @param environment the environment to use for this cluster.
*/
AsyncCluster(final Supplier<ClusterEnvironment> environment, final Authenticator authenticator, Set<SeedNode> seedNodes) {
AsyncCluster(final Supplier<ClusterEnvironment> environment, final Authenticator authenticator,
final Set<SeedNode> seedNodes, final String connectionString) {
this.environment = environment;
this.core = Core.create(environment.get(), authenticator, seedNodes);
this.core = Core.create(environment.get(), authenticator, seedNodes, connectionString);
this.searchIndexManager = new AsyncSearchIndexManager(core);
this.queryAccessor = new QueryAccessor(core);
this.userManager = new AsyncUserManager(core);
Expand Down
10 changes: 6 additions & 4 deletions java-client/src/main/java/com/couchbase/client/java/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ public static Cluster connect(final String connectionString, final ClusterOption
return new Cluster(
environmentSupplier,
opts.authenticator(),
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
seedNodesFromConnectionString(connectionString, environmentSupplier.get()),
connectionString
);
}

Expand All @@ -286,7 +287,8 @@ public static Cluster connect(final Set<SeedNode> seedNodes, final ClusterOption
notNull(options, "ClusterOptions");

final ClusterOptions.Built opts = options.build();
return new Cluster(extractClusterEnvironment(asConnectionString(seedNodes), opts), opts.authenticator(), seedNodes);
return new Cluster(extractClusterEnvironment(asConnectionString(seedNodes), opts), opts.authenticator(),
seedNodes, null);
}

/**
Expand Down Expand Up @@ -317,8 +319,8 @@ public static void failIfInstanceLimitReached(final boolean failIfInstanceLimitR
* @param seedNodes the seed nodes to bootstrap from.
*/
private Cluster(final Supplier<ClusterEnvironment> environment, final Authenticator authenticator,
final Set<SeedNode> seedNodes) {
this.asyncCluster = new AsyncCluster(environment, authenticator, seedNodes);
final Set<SeedNode> seedNodes, final String connectionString) {
this.asyncCluster = new AsyncCluster(environment, authenticator, seedNodes, connectionString);
this.reactiveCluster = new ReactiveCluster(asyncCluster);
this.searchIndexManager = new SearchIndexManager(asyncCluster.searchIndexes());
this.userManager = new UserManager(asyncCluster.users());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ public static ReactiveCluster connect(final String connectionString, final Clust
return new ReactiveCluster(
environmentSupplier,
opts.authenticator(),
seedNodesFromConnectionString(connectionString, environmentSupplier.get())
seedNodesFromConnectionString(connectionString, environmentSupplier.get()),
connectionString
);
}

Expand All @@ -164,7 +165,8 @@ public static ReactiveCluster connect(final Set<SeedNode> seedNodes, final Clust
notNull(options, "ClusterOptions");

final ClusterOptions.Built opts = options.build();
return new ReactiveCluster(extractClusterEnvironment(asConnectionString(seedNodes), opts), opts.authenticator(), seedNodes);
return new ReactiveCluster(extractClusterEnvironment(asConnectionString(seedNodes), opts), opts.authenticator(),
seedNodes, null);
}


Expand All @@ -174,8 +176,8 @@ public static ReactiveCluster connect(final Set<SeedNode> seedNodes, final Clust
* @param environment the environment to use for this cluster.
*/
private ReactiveCluster(final Supplier<ClusterEnvironment> environment, final Authenticator authenticator,
final Set<SeedNode> seedNodes) {
this(new AsyncCluster(environment, authenticator, seedNodes));
final Set<SeedNode> seedNodes, final String connectionString) {
this(new AsyncCluster(environment, authenticator, seedNodes, connectionString));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ public class Cluster internal constructor(
private val ownsEnvironment: Boolean,
private val authenticator: Authenticator,
seedNodes: Set<SeedNode>,
connectionString: String,
) {
internal val core: Core = Core.create(environment, authenticator, seedNodes)
internal val core: Core = Core.create(environment, authenticator, seedNodes, connectionString)

internal val env: ClusterEnvironment
get() = core.env
Expand Down Expand Up @@ -712,7 +713,7 @@ public class Cluster internal constructor(
env.securityConfig().tlsEnabled(),
env.eventBus(),
)
return Cluster(env, ownsEnv, authenticator, seedNodes)
return Cluster(env, ownsEnv, authenticator, seedNodes, connectionString)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@
*/
package com.couchbase.client.scala

import java.util.{Optional, UUID}
import java.util.stream.Collectors
import com.couchbase.client.core.Core
import com.couchbase.client.core.annotation.Stability
import com.couchbase.client.core.diagnostics._
import com.couchbase.client.core.env.Authenticator
import com.couchbase.client.core.error.ErrorCodeAndMessage
import com.couchbase.client.core.msg.search.SearchRequest
import com.couchbase.client.core.service.ServiceType
import com.couchbase.client.core.util.ConnectionStringUtil
Expand Down Expand Up @@ -50,6 +47,8 @@ import com.couchbase.client.scala.util.DurationConversions.{javaDurationToScala,
import com.couchbase.client.scala.util.FutureConversions
import reactor.core.scala.publisher.SMono

import java.util.stream.Collectors
import java.util.{Optional, UUID}
import scala.compat.java8.OptionConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}
Expand All @@ -73,15 +72,21 @@ import scala.util.{Failure, Success, Try}
class AsyncCluster(
environment: => ClusterEnvironment,
private[scala] val authenticator: Authenticator,
private[scala] val seedNodes: Set[SeedNode]
private[scala] val seedNodes: Set[SeedNode],
private[scala] val connectionString: String
) {
private[scala] implicit val ec: ExecutionContext = environment.ec

/** The environment used to create this cluster */
val env: ClusterEnvironment = environment

private[couchbase] val core =
Core.create(environment.coreEnv, authenticator, seedNodes.map(_.toCore).asJava)
Core.create(
environment.coreEnv,
authenticator,
seedNodes.map(_.toCore).asJava,
connectionString
)
private[scala] val hp = HandlerBasicParams(core, env)
private[scala] val searchTimeout = javaDurationToScala(env.timeoutConfig.searchTimeout())
private[scala] val analyticsTimeout = javaDurationToScala(env.timeoutConfig.analyticsTimeout())
Expand Down Expand Up @@ -436,7 +441,7 @@ object AsyncCluster {
extractClusterEnvironment(connectionString, options)
.map(ce => {
val seedNodes = seedNodesFromConnectionString(connectionString, ce)
val cluster = new AsyncCluster(ce, options.authenticator, seedNodes)
val cluster = new AsyncCluster(ce, options.authenticator, seedNodes, connectionString)
cluster.performGlobalConnect()
cluster
})
Expand All @@ -455,7 +460,7 @@ object AsyncCluster {
AsyncCluster
.extractClusterEnvironment(options)
.map(ce => {
val cluster = new AsyncCluster(ce, options.authenticator, seedNodes)
val cluster = new AsyncCluster(ce, options.authenticator, seedNodes, null)
cluster.performGlobalConnect()
cluster
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ import scala.util.Try
class Cluster private[scala] (
_env: => ClusterEnvironment,
authenticator: Authenticator,
seedNodes: Set[SeedNode]
seedNodes: Set[SeedNode],
connectionString: String
) {
private[scala] implicit val ec: ExecutionContext = _env.ec

/** Access an asynchronous version of this API. */
val async = new AsyncCluster(_env, authenticator, seedNodes)
val async = new AsyncCluster(_env, authenticator, seedNodes, connectionString)

/** Access a reactive version of this API. */
lazy val reactive = new ReactiveCluster(async)
Expand Down Expand Up @@ -352,7 +353,7 @@ object Cluster {
.extractClusterEnvironment(connectionString, options)
.map(ce => {
val seedNodes = seedNodesFromConnectionString(connectionString, ce)
val cluster = new Cluster(ce, options.authenticator, seedNodes)
val cluster = new Cluster(ce, options.authenticator, seedNodes, connectionString)
cluster.async.performGlobalConnect()
cluster
})
Expand All @@ -371,7 +372,7 @@ object Cluster {
AsyncCluster
.extractClusterEnvironment(options)
.map(ce => {
val cluster = new Cluster(ce, options.authenticator, seedNodes)
val cluster = new Cluster(ce, options.authenticator, seedNodes, null)
cluster.async.performGlobalConnect()
cluster
})
Expand Down
Loading

0 comments on commit 41a182f

Please sign in to comment.