From 44e06635c1524229a923e8fbb525df278fcecdec Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Sat, 3 Mar 2018 02:17:32 +0100 Subject: [PATCH] Utilities for arquillian integration tests (#1310) * Utilities for arquillian integration tests A set of utilities for working with clusters set up in arquillian. - Utilities to work with docker - Utilities to work with arquillian pulsar clusters - Await strategies that arquillian uses to know if a node is up - Stop actions to copy logs at the end of a test suite * Move to log4j2 * Pull in extra dependencies for log4j yml configuration --- tests/integration-tests-utils/pom.xml | 84 ++++++ .../org/apache/pulsar/tests/DockerUtils.java | 220 ++++++++++++++ .../tests/LogToTargetDirStopAction.java | 41 +++ .../pulsar/tests/NoopAwaitStrategy.java | 28 ++ .../pulsar/tests/PulsarClusterUtils.java | 269 ++++++++++++++++++ .../PulsarLogsToTargetDirStopAction.java | 43 +++ .../pulsar/tests/ZooKeeperAwaitStrategy.java | 54 ++++ .../src/main/resources/log4j2.yml | 41 +++ tests/pom.xml | 1 + 9 files changed, 781 insertions(+) create mode 100644 tests/integration-tests-utils/pom.xml create mode 100644 tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java create mode 100644 tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java create mode 100644 tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java create mode 100644 tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java create mode 100644 tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java create mode 100644 tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java create mode 100644 tests/integration-tests-utils/src/main/resources/log4j2.yml diff --git a/tests/integration-tests-utils/pom.xml b/tests/integration-tests-utils/pom.xml new file mode 100644 index 0000000000000..c6d74c8537548 --- /dev/null +++ b/tests/integration-tests-utils/pom.xml @@ -0,0 +1,84 @@ + + + + 4.0.0 + + org.apache.pulsar.tests + tests-parent + 2.0.0-incubating-SNAPSHOT + + + org.apache.pulsar.tests + integration-tests-utils + jar + + Apache Pulsar :: Tests :: Utility module for Arquillian based integration tests + + + 1.15.1 + 1.15 + + + + + org.apache.commons + commons-compress + ${commons-compress.version} + + + + org.apache.zookeeper + zookeeper + + + + org.slf4j + slf4j-api + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + com.fasterxml.jackson.core + jackson-databind + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + + + org.arquillian.cube + arquillian-cube-docker + ${arquillian-cube.version} + + + + diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java new file mode 100644 index 0000000000000..fb69dc7a6eb2d --- /dev/null +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/DockerUtils.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import com.github.dockerjava.api.DockerClient; +import com.github.dockerjava.api.async.ResultCallback; +import com.github.dockerjava.api.command.InspectExecResponse; +import com.github.dockerjava.api.model.Frame; +import com.github.dockerjava.api.model.ContainerNetwork; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DockerUtils { + private static final Logger LOG = LoggerFactory.getLogger(DockerUtils.class); + + private static File getTargetDirectory(String containerId) { + String base = System.getProperty("maven.buildDirectory"); + if (base == null) { + base = "target"; + } + File directory = new File(base + "/container-logs/" + containerId); + if (!directory.exists() && !directory.mkdirs()) { + LOG.error("Error creating directory for container logs."); + } + return directory; + } + + public static void dumpContainerLogToTarget(DockerClient docker, String containerId) { + File output = new File(getTargetDirectory(containerId), "docker.log"); + try (FileOutputStream os = new FileOutputStream(output)) { + CompletableFuture future = new CompletableFuture<>(); + docker.logContainerCmd(containerId).withStdOut(true) + .withStdErr(true).withTimestamps(true).exec(new ResultCallback() { + @Override + public void close() {} + + @Override + public void onStart(Closeable closeable) {} + + @Override + public void onNext(Frame object) { + try { + os.write(object.getPayload()); + } catch (IOException e) { + onError(e); + } + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + future.complete(true); + } + }); + future.get(); + } catch (RuntimeException|ExecutionException|IOException e) { + LOG.error("Error dumping log for {}", containerId, e); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.info("Interrupted dumping log from container {}", containerId, ie); + } + } + + public static void dumpContainerLogDirToTarget(DockerClient docker, String containerId, String path) { + final int READ_BLOCK_SIZE = 10000; + + try (InputStream dockerStream = docker.copyArchiveFromContainerCmd(containerId, path).exec(); + TarArchiveInputStream stream = new TarArchiveInputStream(dockerStream)) { + TarArchiveEntry entry = stream.getNextTarEntry(); + while (entry != null) { + if (entry.isFile()) { + File output = new File(getTargetDirectory(containerId), entry.getName().replace("/", "-")); + try (FileOutputStream os = new FileOutputStream(output)) { + byte[] block = new byte[READ_BLOCK_SIZE]; + int read = stream.read(block, 0, READ_BLOCK_SIZE); + while (read > -1) { + os.write(block, 0, read); + read = stream.read(block, 0, READ_BLOCK_SIZE); + } + } + } + entry = stream.getNextTarEntry(); + } + } catch (RuntimeException|IOException e) { + LOG.error("Error reading logs from container {}", containerId, e); + } + } + + public static String getContainerIP(DockerClient docker, String containerId) { + for (Map.Entry e : docker.inspectContainerCmd(containerId) + .exec().getNetworkSettings().getNetworks().entrySet()) { + return e.getValue().getIpAddress(); + } + throw new IllegalArgumentException("Container " + containerId + " has no networks"); + } + + public static String getContainerHostname(DockerClient docker, String containerId) { + return runCommand(docker, containerId, "hostname").trim(); + } + + public static String runCommand(DockerClient docker, String containerId, String... cmd) { + CompletableFuture future = new CompletableFuture<>(); + String execid = docker.execCreateCmd(containerId).withCmd(cmd) + .withAttachStderr(true).withAttachStdout(true).exec().getId(); + String cmdString = Arrays.stream(cmd).collect(Collectors.joining(" ")); + StringBuffer output = new StringBuffer(); + docker.execStartCmd(execid).withDetach(false) + .exec(new ResultCallback() { + @Override + public void close() {} + + @Override + public void onStart(Closeable closeable) { + LOG.info("DOCKER.exec({}:{}): Executing...", containerId, cmdString); + } + + @Override + public void onNext(Frame object) { + LOG.info("DOCKER.exec({}:{}): {}", containerId, cmdString, object); + output.append(new String(object.getPayload())); + } + + @Override + public void onError(Throwable throwable) { + future.completeExceptionally(throwable); + } + + @Override + public void onComplete() { + LOG.info("DOCKER.exec({}:{}): Done", containerId, cmdString); + future.complete(true); + } + }); + future.join(); + + InspectExecResponse resp = docker.inspectExecCmd(execid).exec(); + while (resp.isRunning()) { + try { + Thread.sleep(200); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + resp = docker.inspectExecCmd(execid).exec(); + } + int retCode = resp.getExitCode(); + if (retCode != 0) { + throw new RuntimeException( + String.format("cmd(%s) failed on %s with exitcode %d", + cmdString, containerId, retCode)); + } else { + LOG.info("DOCKER.exec({}:{}): completed with {}", containerId, cmdString, retCode); + } + return output.toString(); + } + + public static Optional getContainerCluster(DockerClient docker, String containerId) { + return Optional.ofNullable(docker.inspectContainerCmd(containerId) + .exec().getConfig().getLabels().get("cluster")); + } + + public static Set allCubeIds() { + Pattern pattern = Pattern.compile("^arq.cube.docker.([^.]*).ip$"); + return System.getProperties().keySet().stream() + .map(k -> pattern.matcher(k.toString())) + .filter(m -> m.matches()) + .map(m -> m.group(1)) + .collect(Collectors.toSet()); + } + + public static Set cubeIdsWithLabels(DockerClient docker, Map labels) { + return allCubeIds().stream() + .filter(id -> { + Map configuredLabels = docker.inspectContainerCmd(id).exec().getConfig().getLabels(); + return labels.entrySet().stream() + .map(e -> configuredLabels.containsKey(e.getKey()) + && configuredLabels.get(e.getKey()).equals(e.getValue())) + .reduce(true, (acc, res) -> acc && res); + }) + .collect(Collectors.toSet()); + } +} diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java new file mode 100644 index 0000000000000..f74ce471ff552 --- /dev/null +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/LogToTargetDirStopAction.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.arquillian.cube.docker.impl.docker.DockerClientExecutor; +import org.arquillian.cube.impl.model.CubeId; +import org.arquillian.cube.spi.beforeStop.BeforeStopAction; + +public class LogToTargetDirStopAction implements BeforeStopAction { + private DockerClientExecutor dockerClientExecutor; + private CubeId containerID; + + public void setDockerClientExecutor(DockerClientExecutor executor) { + this.dockerClientExecutor = executor; + } + + public void setContainerID(CubeId containerID) { + this.containerID = containerID; + } + + @Override + public void doBeforeStop() { + DockerUtils.dumpContainerLogToTarget(dockerClientExecutor.getDockerClient(), containerID.getId()); + } +} diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java new file mode 100644 index 0000000000000..d9ef56c8c2436 --- /dev/null +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/NoopAwaitStrategy.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.arquillian.cube.spi.await.AwaitStrategy; + +public class NoopAwaitStrategy implements AwaitStrategy { + @Override + public boolean await() { + return true; + } +} diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java new file mode 100644 index 0000000000000..c5b8e7019d6f8 --- /dev/null +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarClusterUtils.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.github.dockerjava.api.DockerClient; +import com.google.common.collect.ImmutableMap; + +import java.io.IOException; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.net.Socket; + +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.Event.KeeperState; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PulsarClusterUtils { + private static final Logger LOG = LoggerFactory.getLogger(PulsarClusterUtils.class); + static final short BROKER_PORT = 8080; + + public static String zookeeperConnectString(DockerClient docker, String cluster) { + return DockerUtils.cubeIdsWithLabels(docker, ImmutableMap.of("service", "zookeeper", "cluster", cluster)) + .stream().map((id) -> DockerUtils.getContainerIP(docker, id)).collect(Collectors.joining(":")); + } + + public static ZooKeeper zookeeperClient(DockerClient docker, String cluster) throws Exception { + String connectString = zookeeperConnectString(docker, cluster); + LOG.info("Connecting to zookeeper {}", connectString); + CompletableFuture future = new CompletableFuture<>(); + ZooKeeper zk = new ZooKeeper(connectString, 10000, + (e) -> { + if (e.getState().equals(KeeperState.SyncConnected)) { + future.complete(null); + } + }); + future.get(); + return zk; + } + + public static boolean zookeeperRunning(DockerClient docker, String containerId) { + String ip = DockerUtils.getContainerIP(docker, containerId); + try (Socket socket = new Socket(ip, 2181)) { + socket.setSoTimeout(1000); + socket.getOutputStream().write("ruok".getBytes(UTF_8)); + byte[] resp = new byte[4]; + if (socket.getInputStream().read(resp) == 4) { + return new String(resp, UTF_8).equals("imok"); + } + } catch (IOException e) { + // ignore, we'll return fallthrough to return false + } + return false; + } + + public static boolean runOnAnyBroker(DockerClient docker, String cluster, String... cmds) throws Exception { + Optional broker = DockerUtils.cubeIdsWithLabels( + docker,ImmutableMap.of("service", "pulsar-broker", "cluster", cluster)).stream().findAny(); + if (broker.isPresent()) { + DockerUtils.runCommand(docker, broker.get(), cmds); + return true; + } else { + return false; + } + } + + public static void runOnAllBrokers(DockerClient docker, String cluster, String... cmds) throws Exception { + DockerUtils.cubeIdsWithLabels(docker,ImmutableMap.of("service", "pulsar-broker", "cluster", cluster)) + .stream().forEach((b) -> DockerUtils.runCommand(docker, b, cmds)); + } + + private static boolean waitBrokerState(DockerClient docker, String containerId, + int timeout, TimeUnit timeoutUnit, + boolean upOrDown) { + long timeoutMillis = timeoutUnit.toMillis(timeout); + long pollMillis = 1000; + String brokerId = DockerUtils.getContainerHostname(docker, containerId) + ":" + BROKER_PORT; + Optional containerCluster = DockerUtils.getContainerCluster(docker, containerId); + if (!containerCluster.isPresent()) { + LOG.error("Unable to determine cluster for container {}. Missing label?", containerId); + return false; + } + + ZooKeeper zk = null; + try { + zk = zookeeperClient(docker, containerCluster.get()); + String path = "/loadbalance/brokers/" + brokerId; + while (timeoutMillis > 0) { + if ((zk.exists(path, false) != null) == upOrDown) { + return true; + } + Thread.sleep(pollMillis); + timeoutMillis -= pollMillis; + } + } catch (Exception e) { + LOG.error("Exception checking for broker state", e); + return false; + } finally { + try { + if (zk != null) { + zk.close(); + } + } catch (Exception e) { + LOG.error("Exception closing zookeeper client", e); + return false; + } + } + LOG.warn("Broker {} didn't go {} after {} seconds", + containerId, upOrDown ? "up" : "down", + timeoutUnit.toSeconds(timeout)); + return false; + } + + public static boolean waitBrokerUp(DockerClient docker, String containerId, + int timeout, TimeUnit timeoutUnit) { + if (waitBrokerState(docker, containerId, timeout, timeoutUnit, true)) { + String ip = DockerUtils.getContainerIP(docker, containerId); + + long timeoutMillis = timeoutUnit.toMillis(timeout); + long pollMillis = 100; + + while (timeoutMillis > 0) { + try (Socket socket = new Socket(ip, BROKER_PORT)) { + return true; + } catch (Exception e) { + // couldn't connect, try again after sleep + } + try { + Thread.sleep(pollMillis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + timeoutMillis -= pollMillis; + } + } + return false; + } + + public static boolean waitBrokerDown(DockerClient docker, String containerId, + int timeout, TimeUnit timeoutUnit) { + return waitBrokerState(docker, containerId, timeout, timeoutUnit, false); + } + + public static boolean waitAllBrokersUp(DockerClient docker, String cluster) { + return brokerSet(docker, cluster).stream() + .map((b) -> waitBrokerUp(docker, b, 10, TimeUnit.SECONDS)) + .reduce(true, (accum, res) -> accum && res); + } + + public static boolean waitAllBrokersDown(DockerClient docker, String cluster) { + return brokerSet(docker, cluster).stream() + .map((b) -> waitBrokerDown(docker, b, 10, TimeUnit.SECONDS)) + .reduce(true, (accum, res) -> accum && res); + } + + public static boolean waitSupervisord(DockerClient docker, String containerId) { + int i = 50; + while (i > 0) { + try { + DockerUtils.runCommand(docker, containerId, "test", "-S", "/var/run/supervisor/supervisor.sock"); + return true; + } catch (Exception e) { + // supervisord not running + } + try { + Thread.sleep(100); + i++; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + return false; + } + + public static boolean startAllBrokers(DockerClient docker, String cluster) { + brokerSet(docker, cluster).stream().forEach( + (b) -> { + waitSupervisord(docker, b); + DockerUtils.runCommand(docker, b, "supervisorctl", "start", "broker"); + }); + + return waitAllBrokersUp(docker, cluster); + } + + public static boolean stopAllBrokers(DockerClient docker, String cluster) { + brokerSet(docker, cluster).stream().forEach( + (b) -> DockerUtils.runCommand(docker, b, "supervisorctl", "stop", "broker")); + + return waitAllBrokersDown(docker, cluster); + } + + public static Set brokerSet(DockerClient docker, String cluster) { + return DockerUtils.cubeIdsWithLabels(docker, ImmutableMap.of("service", "pulsar-broker", + "cluster", cluster)); + } + + public static boolean waitProxyUp(DockerClient docker, String containerId, + int timeout, TimeUnit timeoutUnit) { + String ip = DockerUtils.getContainerIP(docker, containerId); + long timeoutMillis = timeoutUnit.toMillis(timeout); + long pollMillis = 100; + + while (timeoutMillis > 0) { + try (Socket socket = new Socket(ip, BROKER_PORT)) { + return true; + } catch (Exception e) { + // couldn't connect, try again after sleep + } + try { + Thread.sleep(pollMillis); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + timeoutMillis -= pollMillis; + } + return false; + } + + public static boolean waitAllProxiesUp(DockerClient docker, String cluster) { + return proxySet(docker, cluster).stream() + .map((b) -> waitProxyUp(docker, b, 10, TimeUnit.SECONDS)) + .reduce(true, (accum, res) -> accum && res); + } + + public static boolean startAllProxies(DockerClient docker, String cluster) { + proxySet(docker, cluster).stream().forEach( + (b) -> { + waitSupervisord(docker, b); + DockerUtils.runCommand(docker, b, "supervisorctl", "start", "proxy"); + }); + + return waitAllProxiesUp(docker, cluster); + } + + public static void stopAllProxies(DockerClient docker, String cluster) { + proxySet(docker, cluster).stream().forEach( + (b) -> DockerUtils.runCommand(docker, b, "supervisorctl", "stop", "proxy")); + } + + public static Set proxySet(DockerClient docker, String cluster) { + return DockerUtils.cubeIdsWithLabels(docker, ImmutableMap.of("service", "pulsar-proxy", + "cluster", cluster)); + } +} diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java new file mode 100644 index 0000000000000..00233595dbe2a --- /dev/null +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/PulsarLogsToTargetDirStopAction.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import org.arquillian.cube.docker.impl.docker.DockerClientExecutor; +import org.arquillian.cube.impl.model.CubeId; +import org.arquillian.cube.spi.beforeStop.BeforeStopAction; + +public class PulsarLogsToTargetDirStopAction implements BeforeStopAction { + private DockerClientExecutor dockerClientExecutor; + private CubeId containerID; + + public void setDockerClientExecutor(DockerClientExecutor executor) { + this.dockerClientExecutor = executor; + } + + public void setContainerID(CubeId containerID) { + this.containerID = containerID; + } + + @Override + public void doBeforeStop() { + DockerUtils.dumpContainerLogToTarget(dockerClientExecutor.getDockerClient(), containerID.getId()); + DockerUtils.dumpContainerLogDirToTarget(dockerClientExecutor.getDockerClient(), + containerID.getId(), "/var/log/pulsar"); + } +} diff --git a/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java new file mode 100644 index 0000000000000..7cad4cb76a5ff --- /dev/null +++ b/tests/integration-tests-utils/src/main/java/org/apache/pulsar/tests/ZooKeeperAwaitStrategy.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests; + +import java.util.concurrent.TimeUnit; + +import org.arquillian.cube.spi.Cube; +import org.arquillian.cube.spi.await.AwaitStrategy; +import org.arquillian.cube.docker.impl.client.config.Await; +import org.arquillian.cube.docker.impl.docker.DockerClientExecutor; +import org.arquillian.cube.docker.impl.util.Ping; +import org.arquillian.cube.docker.impl.util.PingCommand; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZooKeeperAwaitStrategy implements AwaitStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperAwaitStrategy.class); + + private static final int DEFAULT_POLL_ITERATIONS = 10; + private static final int DEFAULT_SLEEP_TIME = 1; + private static final TimeUnit DEFAULT_SLEEP_TIMEUNIT = TimeUnit.SECONDS; + + private Cube cube; + private DockerClientExecutor dockerClientExecutor; + + @Override + public boolean await() { + return Ping.ping(DEFAULT_POLL_ITERATIONS, DEFAULT_SLEEP_TIME, DEFAULT_SLEEP_TIMEUNIT, + new PingCommand() { + @Override + public boolean call() { + return PulsarClusterUtils.zookeeperRunning(dockerClientExecutor.getDockerClient(), + cube.getId()); + } + }); + } +} diff --git a/tests/integration-tests-utils/src/main/resources/log4j2.yml b/tests/integration-tests-utils/src/main/resources/log4j2.yml new file mode 100644 index 0000000000000..94ad627c63fc3 --- /dev/null +++ b/tests/integration-tests-utils/src/main/resources/log4j2.yml @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +Configuration: + name: test + + Appenders: + + # Console + Console: + name: Console + target: SYSTEM_OUT + PatternLayout: + Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" + + Loggers: + + Root: + level: warn + AppenderRef: + - ref: Console + + Logger: + name: org.apache.pulsar + level: info diff --git a/tests/pom.xml b/tests/pom.xml index ea6f20b598447..5c8429d66edd8 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -33,6 +33,7 @@ Apache Pulsar :: Tests docker-images + integration-tests-utils