Skip to content

Commit

Permalink
[grid] Fix a bug with concurrent session creation on nodes
Browse files Browse the repository at this point in the history
The expectation was that nodes would only ever be called
sequentially. This is not a safe assumption to make, so we don't make
it any more.
  • Loading branch information
shs96c committed Jul 21, 2020
1 parent 519df0f commit a3e0daf
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 16 deletions.
17 changes: 16 additions & 1 deletion java/client/test/org/openqa/selenium/remote/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
load("@rules_jvm_external//:defs.bzl", "artifact")
load("//java:defs.bzl", "java_test_suite")
load("//java:defs.bzl", "java_selenium_test_suite", "java_test_suite")

LARGE_TESTS = [
"ParallelSessionsTest.java",
"RemoteWebDriverScreenshotTest.java",
]

Expand Down Expand Up @@ -29,3 +30,17 @@ java_test_suite(
artifact("org.mockito:mockito-core"),
],
)

java_selenium_test_suite(
name = "large-tests",
size = "large",
srcs = LARGE_TESTS,
deps = [
"//java/client/src/org/openqa/selenium/remote",
"//java/client/test/org/openqa/selenium/testing:annotations",
"//java/client/test/org/openqa/selenium/testing:test-base",
"//java/client/test/org/openqa/selenium/testing/drivers",
artifact("org.assertj:assertj-core"),
artifact("junit:junit"),
],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.remote;

import org.junit.Test;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.testing.JUnit4TestBase;
import org.openqa.selenium.testing.drivers.WebDriverBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.openqa.selenium.testing.Safely.safelyCall;

public class ParallelSessionsTest extends JUnit4TestBase {
private ExecutorService service = Executors.newFixedThreadPool(3);

@Test
public void shouldBeAbleToRunMultipleBrowsersAtTheSameTime() throws Exception {
// Create three browsers at the same time
List<Future<WebDriver>> all = service.invokeAll(Arrays.asList(
this::createDriver,
this::createDriver,
this::createDriver));

try {
// And now use them.
for (Future<WebDriver> future : all) {
future.get(30, SECONDS);
}
} finally {
for (Future<WebDriver> future : all) {
safelyCall(() -> {
future.cancel(true);
future.get(1, SECONDS).quit();
});
}
}
}

private WebDriver createDriver() {
WebDriver driver = new WebDriverBuilder().get();
driver.get(pages.simpleTestPage);
driver.getTitle();
return driver;
}
}
30 changes: 20 additions & 10 deletions java/server/src/org/openqa/selenium/grid/node/local/LocalNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,29 +189,39 @@ public Optional<CreateSessionResponse> newSession(CreateSessionRequest sessionRe
return Optional.empty();
}

Optional<ActiveSession> possibleSession = Optional.empty();
SessionSlot slot = null;
for (SessionSlot factory : factories) {
if (!factory.isAvailable() || !factory.test(sessionRequest.getCapabilities())) {
continue;
}
// Identify possible slots to use as quickly as possible to enable concurrent session starting
SessionSlot slotToUse = null;
synchronized(factories) {
for (SessionSlot factory : factories) {
if (!factory.isAvailable() || !factory.test(sessionRequest.getCapabilities())) {
continue;
}

possibleSession = factory.apply(sessionRequest);
if (possibleSession.isPresent()) {
slot = factory;
factory.reserve();
slotToUse = factory;
break;
}
}

if (slotToUse == null) {
span.setAttribute("error", true);
span.setStatus(Status.NOT_FOUND);
span.addEvent("No slot matched capabilities " + sessionRequest.getCapabilities());
return Optional.empty();
}

Optional<ActiveSession> possibleSession = slotToUse.apply(sessionRequest);

if (!possibleSession.isPresent()) {
slotToUse.release();
span.setAttribute("error", true);
span.setStatus(Status.NOT_FOUND);
span.addEvent("No slots available for capabilities " + sessionRequest.getCapabilities());
return Optional.empty();
}

ActiveSession session = possibleSession.get();
currentSessions.put(session.getId(), slot);
currentSessions.put(session.getId(), slotToUse);

SESSION_ID.accept(span, session.getId());
CAPABILITIES.accept(span, session.getCapabilities());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.openqa.selenium.remote.http.HttpResponse;

import java.io.UncheckedIOException;
import java.util.ConcurrentModificationException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.logging.Level;
Expand All @@ -47,6 +49,7 @@ public class SessionSlot implements
private final EventBus bus;
private final Capabilities stereotype;
private final SessionFactory factory;
private final AtomicBoolean reserved = new AtomicBoolean(false);
private ActiveSession currentSession;

public SessionSlot(EventBus bus, Capabilities stereotype, SessionFactory factory) {
Expand All @@ -59,8 +62,18 @@ public Capabilities getStereotype() {
return stereotype;
}

public void reserve() {
if (reserved.getAndSet(true)) {
throw new IllegalStateException("Attempt to reserve a slot that is already reserved");
}
}

public void release() {
reserved.set(false);
}

public boolean isAvailable() {
return currentSession == null;
return !reserved.get();
}

public ActiveSession getSession() {
Expand All @@ -77,8 +90,13 @@ public void stop() {
}

SessionId id = currentSession.getId();
currentSession.stop();
try {
currentSession.stop();
} catch (Exception e) {
LOG.log(Level.WARNING, "Unable to cleanly close session", e);
}
currentSession = null;
release();
bus.fire(new SessionClosedEvent(id));
}

Expand All @@ -98,7 +116,7 @@ public boolean test(Capabilities capabilities) {

@Override
public Optional<ActiveSession> apply(CreateSessionRequest sessionRequest) {
if (!isAvailable()) {
if (currentSession != null) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ java_test_suite(
"//java/server/src/org/openqa/selenium/grid/node/local",
"//java/server/src/org/openqa/selenium/grid/sessionmap/local",
"//java/server/test/org/openqa/selenium/grid/testing",
artifact("com.google.guava:guava"),
artifact("io.opentelemetry:opentelemetry-api"),
artifact("junit:junit"),
artifact("org.assertj:assertj-core"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,27 @@

package org.openqa.selenium.grid.distributor.local;

import com.google.common.collect.ImmutableMap;
import org.junit.Before;
import org.junit.Test;
import org.openqa.selenium.Capabilities;
import org.openqa.selenium.ImmutableCapabilities;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.local.GuavaEventBus;
import org.openqa.selenium.grid.data.CreateSessionResponse;
import org.openqa.selenium.grid.data.DistributorStatus;
import org.openqa.selenium.grid.data.Session;
import org.openqa.selenium.grid.distributor.Distributor;
import org.openqa.selenium.grid.node.Node;
import org.openqa.selenium.grid.node.local.LocalNode;
import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap;
import org.openqa.selenium.grid.testing.TestSessionFactory;
import org.openqa.selenium.remote.HttpSessionId;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpMethod;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.tracing.DefaultTestTracer;
Expand All @@ -41,17 +46,26 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.openqa.selenium.remote.http.HttpMethod.GET;

public class LocalDistributorTest {

Expand Down Expand Up @@ -183,10 +197,75 @@ public void testAllBucketsNotSameSizeProveNotUsingAverage() {
//Make sure the numbers don't just average out to the same size
Map<String, Set<Host>> hostBuckets = buildBuckets(4, 5, 6 );

LocalDistributor distributor = new LocalDistributor(tracer, bus, clientFactory, new LocalSessionMap(tracer, bus), null);
LocalDistributor distributor = new LocalDistributor(
tracer,
bus,
clientFactory,
new LocalSessionMap(tracer, bus),
null);
assertThat(distributor.allBucketsSameSize(hostBuckets)).isFalse();
}

@Test
public void shouldBeAbleToAddMultipleSessionsConcurrently() throws Exception {
Distributor distributor = new LocalDistributor(
tracer,
bus,
clientFactory,
new LocalSessionMap(tracer, bus),
null);

// Add one node to ensure that everything is created in that.
Capabilities caps = new ImmutableCapabilities("browserName", "cheese");

class VerifyingHandler extends Session implements HttpHandler {
private VerifyingHandler(SessionId id, Capabilities capabilities) {
super(id, uri, capabilities);
}

@Override
public HttpResponse execute(HttpRequest req) {
Optional<SessionId> id = HttpSessionId.getSessionId(req.getUri()).map(SessionId::new);
assertThat(id).isEqualTo(Optional.of(getId()));
return new HttpResponse();
}
}

// Only use one node.
Node node = LocalNode.builder(tracer, bus, uri, uri, null)
.add(caps, new TestSessionFactory(VerifyingHandler::new))
.add(caps, new TestSessionFactory(VerifyingHandler::new))
.add(caps, new TestSessionFactory(VerifyingHandler::new))
.build();
distributor.add(node);

HttpRequest req = new HttpRequest(HttpMethod.POST, "/session")
.setContent(Contents.asJson(ImmutableMap.of(
"capabilities", ImmutableMap.of(
"alwaysMatch", ImmutableMap.of(
"browserName", "cheese")))));


List<Callable<SessionId>> callables = new ArrayList<>();
for (int i = 0; i < 3; i++) {
callables.add(() -> {
CreateSessionResponse res = distributor.newSession(req);
assertThat(res.getSession().getCapabilities().getBrowserName()).isEqualTo("cheese");
return res.getSession().getId();
});
}

List<Future<SessionId>> futures = Executors.newFixedThreadPool(3).invokeAll(callables);

for (Future<SessionId> future : futures) {
SessionId id = future.get(2, SECONDS);

// Now send a random command.
HttpResponse res = node.execute(new HttpRequest(GET, String.format("/session/%s/url", id)));
assertThat(res.isSuccessful()).isTrue();
}
}

//Build a few Host Buckets of different sizes
private Map<String, Set<Host>> buildBuckets(int...sizes) {
Map<String, Set<Host>> hostBuckets = new HashMap<>();
Expand Down
Loading

0 comments on commit a3e0daf

Please sign in to comment.