From bc1b231c9e32667b2978c86a6a64833470973dbd Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sun, 25 Sep 2022 16:21:45 +0800 Subject: [PATCH 1/4] ZOOKEEPER-4327: Fix flaky RequestThrottlerTest This PR tries to fix several test failures in `RequestThrottlerTest`. First, `RequestThrottlerTest#testDropStaleRequests`. Place `Thread.sleep(200)` after `submittedRequests.take()` in `RequestThrottler#run` will fail two assertions: 1. `assertEquals(2L, (long) metrics.get("prep_processor_request_queued"))` 2. `assertEquals(1L, (long) metrics.get("request_throttle_wait_count"))` This happens due to `setStale` chould happen before throttle handling. This commit solves this by introducing an interception point `RequestThrottler.throttleSleep` to build happen-before relations: 1. `throttling.countDown` happens before `setStale`, this ensures that unthrottled request are processed as usual. 2. `setStale` happens before `throttled.await`, this defends `RequestThrottler.throttleSleep` against spurious wakeup. Second, `RequestThrottlerTest#testRequestThrottler`. * `RequestThrottlerTest.testRequestThrottler:197 expected: <2> but was: <1>` `ZooKeeperServer#submitRequest` and `PrepRequestProcessor#processRequest` run in different threads, thus there is no guarantee on metric `prep_processor_request_queued` after `submitted.await(5, TimeUnit.SECONDS)`. Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in `RequestThrottler#run` will incur this failure. * `RequestThrottlerTest.testRequestThrottler:206 expected: <5> but was: <4>` `entered.await(STALL_TIME, TimeUnit.MILLISECONDS)` could return `false` due to almost same timeout as `RequestThrottler#throttleSleep`. Place `Thread.sleep(500)` around `throttleSleep` will increase failure possibility. Third, `RequestThrottlerTest#testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled`. * `RequestThrottlerTest.testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled:340 expected: <3> but was: <4>` `ZooKeeperServer#shouldThrottle` depends on consistent sum of `getInflight` and `getInProcess`. But it is no true. Place `Thread.sleep(200)` before `zks.submitRequestNow(request)` in `RequestThrottler#run` could reproduce this. Sees also https://github.com/apache/zookeeper/pull/1739, https://github.com/apache/zookeeper/pull/1821. Author: Kezhu Wang Reviewers: Mate Szalay-Beko , maoling Closes #1887 from kezhuw/ZOOKEEPER-4327-flaky-RequestThrottlerTest.testDropStaleRequests --- .../zookeeper/server/RequestThrottler.java | 12 ++-- .../zookeeper/server/ZooKeeperServer.java | 5 +- .../server/RequestThrottlerTest.java | 66 +++++++++++++++---- 3 files changed, 64 insertions(+), 19 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java index d60efa0878f..4a401e5b919 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java @@ -195,13 +195,11 @@ public void run() { LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } - private synchronized void throttleSleep(int stallTime) { - try { - ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); - this.wait(stallTime); - } catch (InterruptedException ie) { - return; - } + + // @VisibleForTesting + synchronized void throttleSleep(int stallTime) throws InterruptedException { + ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); + this.wait(stallTime); } @SuppressFBWarnings(value = "NN_NAKED_NOTIFY", justification = "state change is in ZooKeeperServer.decInProgress() ") diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 0303ca645bd..817e84b3e59 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -749,9 +749,12 @@ protected void startJvmPauseMonitor() { } protected void startRequestThrottler() { - requestThrottler = new RequestThrottler(this); + requestThrottler = createRequestThrottler(); requestThrottler.start(); + } + protected RequestThrottler createRequestThrottler() { + return new RequestThrottler(this); } protected void setupRequestProcessors() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java index ed22399902f..15259207599 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java @@ -67,11 +67,17 @@ public class RequestThrottlerTest extends ZKTestCase { CountDownLatch disconnected = null; + CountDownLatch throttled = null; + CountDownLatch throttling = null; + ZooKeeperServer zks = null; ServerCnxnFactory f = null; ZooKeeper zk = null; int connectionLossCount = 0; + private long getCounterMetric(String name) { + return (long) MetricsUtils.currentServerMetrics().get(name); + } @BeforeEach public void setup() throws Exception { @@ -115,6 +121,11 @@ public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOExc super(snapDir, logDir, tickTime); } + @Override + protected RequestThrottler createRequestThrottler() { + return new TestRequestThrottler(this); + } + @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); @@ -141,6 +152,24 @@ public void requestFinished(Request request) { } } + class TestRequestThrottler extends RequestThrottler { + public TestRequestThrottler(ZooKeeperServer zks) { + super(zks); + } + + @Override + synchronized void throttleSleep(int stallTime) throws InterruptedException { + if (throttling != null) { + throttling.countDown(); + } + super.throttleSleep(stallTime); + // Defend against unstable timing and potential spurious wakeup. + if (throttled != null) { + assertTrue(throttled.await(20, TimeUnit.SECONDS)); + } + } + } + class TestPrepRequestProcessor extends PrepRequestProcessor { public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) { @@ -191,18 +220,22 @@ public void testRequestThrottler() throws Exception { // make sure the server received all 5 requests submitted.await(5, TimeUnit.SECONDS); - Map metrics = MetricsUtils.currentServerMetrics(); // but only two requests can get into the pipeline because of the throttler - assertEquals(2L, (long) metrics.get("prep_processor_request_queued")); - assertEquals(1L, (long) metrics.get("request_throttle_wait_count")); + WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2; + waitFor("request not queued", requestQueued, 5); + + WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1; + waitFor("no throttle wait", throttleWait, 5); // let the requests go through the pipeline and the throttler will be waken up to allow more requests // to enter the pipeline resumeProcess.countDown(); - entered.await(STALL_TIME, TimeUnit.MILLISECONDS); - metrics = MetricsUtils.currentServerMetrics(); + // wait for more than one STALL_TIME to reduce timeout before wakeup + assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS)); + + Map metrics = MetricsUtils.currentServerMetrics(); assertEquals(TOTAL_REQUESTS, (long) metrics.get("prep_processor_request_queued")); } @@ -221,6 +254,9 @@ public void testDropStaleRequests() throws Exception { resumeProcess = new CountDownLatch(1); submitted = new CountDownLatch(TOTAL_REQUESTS); + throttled = new CountDownLatch(1); + throttling = new CountDownLatch(1); + // send 5 requests asynchronously for (int i = 0; i < TOTAL_REQUESTS; i++) { zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " @@ -231,11 +267,18 @@ public void testDropStaleRequests() throws Exception { // make sure the server received all 5 requests assertTrue(submitted.await(5, TimeUnit.SECONDS)); + // stale throttled requests + assertTrue(throttling.await(5, TimeUnit.SECONDS)); for (ServerCnxn cnxn : f.cnxns) { cnxn.setStale(); } + throttled.countDown(); zk = null; + // only first three requests are counted as finished + finished = new CountDownLatch(3); + + // let the requests go through the pipeline resumeProcess.countDown(); LOG.info("raise the latch"); @@ -243,6 +286,8 @@ public void testDropStaleRequests() throws Exception { Thread.sleep(50); } + assertTrue(finished.await(5, TimeUnit.SECONDS)); + // assert after all requests processed to avoid concurrent issues as metrics are // counted in different threads. Map metrics = MetricsUtils.currentServerMetrics(); @@ -327,7 +372,6 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() RequestThrottler.setMaxRequests(0); resumeProcess = new CountDownLatch(1); int totalRequests = 10; - submitted = new CountDownLatch(totalRequests); for (int i = 0; i < totalRequests; i++) { zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " @@ -335,16 +379,16 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() }, null); } - submitted.await(5, TimeUnit.SECONDS); - // We should start throttling instead of queuing more requests. // // We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of requests coming in request processing pipeline // before throttling. For the next request, we will throttle by disabling receiving future requests but we still - // allow this single request coming in. So the total number of queued requests in processing pipeline would + // allow this single request coming in. Ideally, the total number of queued requests in processing pipeline would // be GLOBAL_OUTSTANDING_LIMIT + 2. - assertEquals(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2, - (long) MetricsUtils.currentServerMetrics().get("prep_processor_request_queued")); + // + // But due to leak of consistent view of number of outstanding requests, the number could be larger. + WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2; + waitFor("no enough requests queued", requestQueued, 5); resumeProcess.countDown(); } catch (Exception e) { From e2bc3dd1618405a67e9b412f8ef67eb84141eb76 Mon Sep 17 00:00:00 2001 From: chenhang Date: Thu, 29 Sep 2022 16:36:01 +0200 Subject: [PATCH 2/4] ZOOKEEPER-4616: Upgrade docker image to resolve CVEs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The current docker image `maven:3.6.3-jdk-8` has many critical security issues. maven3.6.3-jdk-8 › dpkg1.19.7 has [CVE-2022-1664](https://www.cve.org/CVERecord?id=CVE-2022-1664) maven3.6.3-jdk-8 › openssl1.1.1d-0+deb10u6 has [CVE-2021-3711](https://www.cve.org/CVERecord?id=CVE-2021-3711) maven3.6.3-jdk-8 › gzip1.9-3 has [CVE-2022-1271](https://www.cve.org/CVERecord?id=CVE-2022-1271) We need to upgrade the docker base image to version `maven:3.8.4-jdk-8` See [ZOOKEEPER-4616](https://issues.apache.org/jira/browse/ZOOKEEPER-4616) for full details. Author: chenhang Reviewers: Enrico Olivelli Closes #1927 from hangc0276/chenhang/ZOOKEEPER-4616 --- dev/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/docker/Dockerfile b/dev/docker/Dockerfile index bd2977f7402..a1b33569e79 100644 --- a/dev/docker/Dockerfile +++ b/dev/docker/Dockerfile @@ -17,7 +17,7 @@ # under the License. # -FROM maven:3.6.3-jdk-8 +FROM maven:3.8.4-jdk-8 RUN apt-get update RUN apt-get install -y \ From 3daefac37e8a7b456542c91adea541a938df1214 Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 5 Oct 2022 17:55:54 +0200 Subject: [PATCH 3/4] ZOOKEEPER-4575: ZooKeeperServer#processPacket take record instead of bytes This is the first step mentioned in [ZOOKEEPER-102](https://issues.apache.org/jira/browse/ZOOKEEPER-102) and we should not play with ByteBuffer among the request handling code path. Author: tison Reviewers: Enrico Olivelli , Mate Szalay-Beko Closes #1905 from tisonkun/request-supplier --- .travis.yml | 20 ++-- .../zookeeper/DeleteContainerRequest.java | 51 ++++++++++ .../apache/zookeeper/audit/AuditHelper.java | 22 ++--- .../server/ByteBufferRequestRecord.java | 64 ++++++++++++ .../zookeeper/server/ContainerManager.java | 7 +- .../server/FinalRequestProcessor.java | 54 +++-------- .../zookeeper/server/NIOServerCnxn.java | 6 +- .../zookeeper/server/NettyServerCnxn.java | 8 +- .../server/PrepRequestProcessor.java | 97 +++++++------------ .../org/apache/zookeeper/server/Request.java | 62 +++++++----- .../zookeeper/server/RequestRecord.java | 46 +++++++++ .../zookeeper/server/SimpleRequestRecord.java | 68 +++++++++++++ .../zookeeper/server/ZooKeeperServer.java | 75 +++++--------- .../zookeeper/server/quorum/Learner.java | 4 +- .../server/quorum/LearnerHandler.java | 5 +- .../server/quorum/LearnerSyncRequest.java | 6 +- .../server/quorum/QuorumZooKeeperServer.java | 14 ++- .../zookeeper/audit/Slf4JAuditLoggerTest.java | 6 +- .../zookeeper/server/CreateContainerTest.java | 26 +++-- .../server/FinalRequestProcessorTest.java | 12 +-- .../server/MultiOpSessionUpgradeTest.java | 4 +- .../PrepRequestProcessorMetricsTest.java | 10 +- .../server/PrepRequestProcessorTest.java | 27 ++---- .../ZooKeeperCriticalThreadMetricsTest.java | 3 +- .../CommitProcessorConcurrencyTest.java | 3 +- .../quorum/CommitProcessorMetricsTest.java | 6 +- .../server/quorum/CommitProcessorTest.java | 5 +- .../server/quorum/RaceConditionTest.java | 6 +- .../quorum/SessionUpgradeQuorumTest.java | 6 +- .../SyncRequestProcessorMetricTest.java | 4 +- .../test/LeaderSessionTrackerTest.java | 5 +- 31 files changed, 438 insertions(+), 294 deletions(-) create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java create mode 100644 zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java diff --git a/.travis.yml b/.travis.yml index 81471d9ed04..7fd2dcd02ff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,15 +5,14 @@ matrix: - os: linux arch: arm64 jdk: openjdk11 + install: | + f=$(which javac) + while [[ -L $f ]]; do f=$(readlink $f); done + export JAVA_HOME=${f%/bin/*} + - os: linux arch: s390x jdk: openjdk11 - addons: - apt: - update: true - packages: - - maven - - libcppunit-dev cache: directories: @@ -21,13 +20,10 @@ cache: addons: apt: + update: true packages: - - libcppunit-dev - -install: - - if [ "${TRAVIS_CPU_ARCH}" == "arm64" ]; then - sudo apt-get install maven; - fi + - maven + - libcppunit-dev script: mvn clean apache-rat:check verify -DskipTests spotbugs:check checkstyle:check -Pfull-build diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java new file mode 100644 index 00000000000..b7fd12689d2 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; + +public class DeleteContainerRequest implements Record { + private String path; + + public DeleteContainerRequest() { + } + + public DeleteContainerRequest(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + @Override + public void serialize(OutputArchive archive, String tag) throws IOException { + archive.writeBuffer(path.getBytes(StandardCharsets.UTF_8), "path"); + } + + @Override + public void deserialize(InputArchive archive, String tag) throws IOException { + byte[] bytes = archive.readBuffer("path"); + path = new String(bytes, StandardCharsets.UTF_8); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java index 5aca1711f7a..d6df7d92438 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MultiOperationRecord; @@ -58,7 +57,7 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool if (!ZKAuditProvider.isAuditEnabled()) { return; } - String op = null; + String op; //For failed transaction rc.path is null String path = txnResult.path; String acls = null; @@ -69,8 +68,7 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool case ZooDefs.OpCode.create2: case ZooDefs.OpCode.createContainer: op = AuditConstants.OP_CREATE; - CreateRequest createRequest = new CreateRequest(); - deserialize(request, createRequest); + CreateRequest createRequest = request.readRequestRecord(CreateRequest::new); createMode = getCreateMode(createRequest); if (failedTxn) { path = createRequest.getPath(); @@ -80,23 +78,20 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool case ZooDefs.OpCode.deleteContainer: op = AuditConstants.OP_DELETE; if (failedTxn) { - DeleteRequest deleteRequest = new DeleteRequest(); - deserialize(request, deleteRequest); + DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new); path = deleteRequest.getPath(); } break; case ZooDefs.OpCode.setData: op = AuditConstants.OP_SETDATA; if (failedTxn) { - SetDataRequest setDataRequest = new SetDataRequest(); - deserialize(request, setDataRequest); + SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new); path = setDataRequest.getPath(); } break; case ZooDefs.OpCode.setACL: op = AuditConstants.OP_SETACL; - SetACLRequest setACLRequest = new SetACLRequest(); - deserialize(request, setACLRequest); + SetACLRequest setACLRequest = request.readRequestRecord(SetACLRequest::new); acls = ZKUtil.aclToString(setACLRequest.getAcl()); if (failedTxn) { path = setACLRequest.getPath(); @@ -125,10 +120,6 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool } } - private static void deserialize(Request request, Record record) throws IOException { - request.readRequestRecord(record); - } - private static Result getResult(ProcessTxnResult rc, boolean failedTxn) { if (failedTxn) { return Result.FAILURE; @@ -191,8 +182,7 @@ private static Map getCreateModes(Request request) if (!ZKAuditProvider.isAuditEnabled()) { return createModes; } - MultiOperationRecord multiRequest = new MultiOperationRecord(); - deserialize(request, multiRequest); + MultiOperationRecord multiRequest = request.readRequestRecord(MultiOperationRecord::new); for (Op op : multiRequest) { if (op.getType() == ZooDefs.OpCode.create || op.getType() == ZooDefs.OpCode.create2 || op.getType() == ZooDefs.OpCode.createContainer) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java new file mode 100644 index 00000000000..5ddae2460ac --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *uuuuu + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "/RequuuAS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.jute.Record; + +public class ByteBufferRequestRecord implements RequestRecord { + + private final ByteBuffer request; + + private volatile Record record; + + public ByteBufferRequestRecord(ByteBuffer request) { + this.request = request; + } + + @SuppressWarnings("unchecked") + @Override + public T readRecord(Supplier constructor) throws IOException { + if (record != null) { + return (T) record; + } + + record = constructor.get(); + request.rewind(); + ByteBufferInputStream.byteBuffer2Record(request, record); + request.rewind(); + return (T) record; + } + + @Override + public byte[] readBytes() { + request.rewind(); + int len = request.remaining(); + byte[] b = new byte[len]; + request.get(b); + request.rewind(); + return b; + } + + @Override + public int limit() { + return request.limit(); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java index 7abac587aa9..2664348c244 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java @@ -18,8 +18,6 @@ package org.apache.zookeeper.server; -import static java.nio.charset.StandardCharsets.UTF_8; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -27,6 +25,7 @@ import java.util.TimerTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.Time; import org.slf4j.Logger; @@ -129,8 +128,8 @@ public void checkContainers() throws InterruptedException { for (String containerPath : getCandidates()) { long startMs = Time.currentElapsedTime(); - ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes(UTF_8)); - Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, path, null); + DeleteContainerRequest record = new DeleteContainerRequest(containerPath); + Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, RequestRecord.fromRecord(record), null); try { LOG.info("Attempting to delete candidate container: {}", containerPath); postDeleteRequest(request); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index bc5b019f812..693a6b86e58 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -269,8 +269,7 @@ public void processRequest(Request request) { } case OpCode.multiRead: { lastOp = "MLTR"; - MultiOperationRecord multiReadRecord = new MultiOperationRecord(); - request.readRequestRecord(multiReadRecord); + MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new); rsp = new MultiResponse(); OpResult subResult; for (Op readOp : multiReadRecord) { @@ -348,8 +347,7 @@ public void processRequest(Request request) { } case OpCode.sync: { lastOp = "SYNC"; - SyncRequest syncRequest = new SyncRequest(); - request.readRequestRecord(syncRequest); + SyncRequest syncRequest = request.readRequestRecord(SyncRequest::new); rsp = new SyncResponse(syncRequest.getPath()); requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath()); break; @@ -363,8 +361,7 @@ public void processRequest(Request request) { case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! - ExistsRequest existsRequest = new ExistsRequest(); - request.readRequestRecord(existsRequest); + ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new); path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); @@ -376,8 +373,7 @@ public void processRequest(Request request) { } case OpCode.getData: { lastOp = "GETD"; - GetDataRequest getDataRequest = new GetDataRequest(); - request.readRequestRecord(getDataRequest); + GetDataRequest getDataRequest = request.readRequestRecord(GetDataRequest::new); path = getDataRequest.getPath(); rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); @@ -385,8 +381,7 @@ public void processRequest(Request request) { } case OpCode.setWatches: { lastOp = "SETW"; - SetWatches setWatches = new SetWatches(); - request.readRequestRecord(setWatches); + SetWatches setWatches = request.readRequestRecord(SetWatches::new); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase() .setWatches( @@ -401,8 +396,7 @@ public void processRequest(Request request) { } case OpCode.setWatches2: { lastOp = "STW2"; - SetWatches2 setWatches = new SetWatches2(); - request.readRequestRecord(setWatches); + SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), @@ -415,16 +409,14 @@ public void processRequest(Request request) { } case OpCode.addWatch: { lastOp = "ADDW"; - AddWatchRequest addWatcherRequest = new AddWatchRequest(); - request.readRequestRecord(addWatcherRequest); + AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new); zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode()); rsp = new ErrorResponse(0); break; } case OpCode.getACL: { lastOp = "GETA"; - GetACLRequest getACLRequest = new GetACLRequest(); - request.readRequestRecord(getACLRequest); + GetACLRequest getACLRequest = request.readRequestRecord(GetACLRequest::new); path = getACLRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { @@ -466,8 +458,7 @@ public void processRequest(Request request) { } case OpCode.getChildren: { lastOp = "GETC"; - GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); - request.readRequestRecord(getChildrenRequest); + GetChildrenRequest getChildrenRequest = request.readRequestRecord(GetChildrenRequest::new); path = getChildrenRequest.getPath(); rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); @@ -475,8 +466,7 @@ public void processRequest(Request request) { } case OpCode.getAllChildrenNumber: { lastOp = "GETACN"; - GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest(); - request.readRequestRecord(getAllChildrenNumberRequest); + GetAllChildrenNumberRequest getAllChildrenNumberRequest = request.readRequestRecord(GetAllChildrenNumberRequest::new); path = getAllChildrenNumberRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { @@ -495,8 +485,7 @@ public void processRequest(Request request) { } case OpCode.getChildren2: { lastOp = "GETC"; - GetChildren2Request getChildren2Request = new GetChildren2Request(); - request.readRequestRecord(getChildren2Request); + GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new); Stat stat = new Stat(); path = getChildren2Request.getPath(); DataNode n = zks.getZKDatabase().getNode(path); @@ -517,8 +506,7 @@ public void processRequest(Request request) { } case OpCode.checkWatches: { lastOp = "CHKW"; - CheckWatchesRequest checkWatches = new CheckWatchesRequest(); - request.readRequestRecord(checkWatches); + CheckWatchesRequest checkWatches = request.readRequestRecord(CheckWatchesRequest::new); WatcherType type = WatcherType.fromInt(checkWatches.getType()); path = checkWatches.getPath(); boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn); @@ -531,8 +519,7 @@ public void processRequest(Request request) { } case OpCode.removeWatches: { lastOp = "REMW"; - RemoveWatchesRequest removeWatches = new RemoveWatchesRequest(); - request.readRequestRecord(removeWatches); + RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new); WatcherType type = WatcherType.fromInt(removeWatches.getType()); path = removeWatches.getPath(); boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn); @@ -550,8 +537,7 @@ public void processRequest(Request request) { } case OpCode.getEphemerals: { lastOp = "GETE"; - GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest(); - request.readRequestRecord(getEphemerals); + GetEphemeralsRequest getEphemerals = request.readRequestRecord(GetEphemeralsRequest::new); String prefixPath = getEphemerals.getPrefixPath(); Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId); List ephemerals = new ArrayList<>(); @@ -585,16 +571,8 @@ public void processRequest(Request request) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process {}", request, e); - StringBuilder sb = new StringBuilder(); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - for (byte b : payload) { - sb.append(String.format("%02x", (0xff & b))); - } - } else { - sb.append("request buffer is null"); - } - LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); + String digest = request.requestDigest(); + LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest); err = Code.MARSHALLINGERROR; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 83e7491e51b..5ffc81da1c3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread; import org.apache.zookeeper.server.command.CommandExecutor; @@ -392,7 +393,10 @@ void doIO(SelectionKey k) throws InterruptedException { } protected void readRequest() throws IOException { - zkServer.processPacket(this, incomingBuffer); + RequestHeader h = new RequestHeader(); + ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h); + RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice()); + zkServer.processPacket(this, h, request); } // returns whether we are interested in writing, which is determined diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index ae482ce2b5e..f95200d560b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -47,6 +47,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.command.CommandExecutor; import org.apache.zookeeper.server.command.FourLetterCommands; @@ -478,9 +479,10 @@ private void receiveMessage(ByteBuf message) { throw new IOException("ZK down"); } if (initialized) { - // TODO: if zks.processPacket() is changed to take a ByteBuffer[], - // we could implement zero-copy queueing. - zks.processPacket(this, bb); + RequestHeader h = new RequestHeader(); + ByteBufferInputStream.byteBuffer2Record(bb, h); + RequestRecord request = RequestRecord.fromBytes(bb.slice()); + zks.processPacket(this, h, request); } else { LOG.debug("got conn req request from {}", getRemoteSocketAddress()); BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb)); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 9733a48aca1..35293359fe7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -18,7 +18,6 @@ package org.apache.zookeeper.server; -import static java.nio.charset.StandardCharsets.UTF_8; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.StringReader; @@ -36,6 +35,7 @@ import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.BadArgumentsException; import org.apache.zookeeper.KeeperException.Code; @@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req */ private static boolean failCreate = false; - LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue(); + LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue<>(); private final RequestProcessor nextProcessor; private final boolean digestEnabled; @@ -311,13 +311,8 @@ private String validatePathForCreate(String path, long sessionId) throws BadArgu /** * This method will be called inside the ProcessRequestThread, which is a * singleton, so there will be a single thread calling this code. - * - * @param type - * @param zxid - * @param request - * @param record */ - protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { + protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException { if (request.getHdr() == null) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); @@ -328,11 +323,12 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { - pRequest2TxnCreate(type, request, record, deserialize); + pRequest2TxnCreate(type, request, record); break; } case OpCode.deleteContainer: { - String path = new String(request.readRequestBytes(), UTF_8); + DeleteContainerRequest txn = (DeleteContainerRequest) record; + String path = txn.getPath(); String parentPath = getParentPathAndValidate(path); ChangeRecord nodeRecord = getRecordForPath(path); if (nodeRecord.childCount > 0) { @@ -359,9 +355,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.delete: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); DeleteRequest deleteRequest = (DeleteRequest) record; - if (deserialize) { - request.readRequestRecord(deleteRequest); - } String path = deleteRequest.getPath(); String parentPath = getParentPathAndValidate(path); ChangeRecord parentRecord = getRecordForPath(parentPath); @@ -387,9 +380,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.setData: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetDataRequest setDataRequest = (SetDataRequest) record; - if (deserialize) { - request.readRequestRecord(setDataRequest); - } path = setDataRequest.getPath(); validatePath(path, request.sessionId); nodeRecord = getRecordForPath(path); @@ -559,9 +549,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.setACL: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetACLRequest setAclRequest = (SetACLRequest) record; - if (deserialize) { - request.readRequestRecord(setAclRequest); - } path = setAclRequest.getPath(); validatePath(path, request.sessionId); List listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl()); @@ -577,8 +564,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, addChangeRecord(nodeRecord); break; case OpCode.createSession: - CreateSessionTxn createSessionTxn = new CreateSessionTxn(); - request.readRequestRecord(createSessionTxn); + CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new); request.setTxn(createSessionTxn); // only add the global session tracker but not to ZKDb zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut()); @@ -630,9 +616,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.check: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record; - if (deserialize) { - request.readRequestRecord(checkVersionRequest); - } path = checkVersionRequest.getPath(); validatePath(path, request.sessionId); nodeRecord = getRecordForPath(path); @@ -653,11 +636,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, } } - private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { - if (deserialize) { - request.readRequestRecord(record); - } - + private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException { int flags; String path; List acl; @@ -792,39 +771,41 @@ private void pRequestHelper(Request request) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: - CreateRequest create2Request = new CreateRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); + CreateRequest create2Request = request.readRequestRecord(CreateRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request); break; case OpCode.createTTL: - CreateTTLRequest createTtlRequest = new CreateTTLRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); + CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest); break; case OpCode.deleteContainer: + DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest); + break; case OpCode.delete: - DeleteRequest deleteRequest = new DeleteRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); + DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest); break; case OpCode.setData: - SetDataRequest setDataRequest = new SetDataRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); + SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest); break; case OpCode.reconfig: - ReconfigRequest reconfigRequest = new ReconfigRequest(); - request.readRequestRecord(reconfigRequest); - pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); + ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest); break; case OpCode.setACL: - SetACLRequest setAclRequest = new SetACLRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); + SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest); break; case OpCode.check: - CheckVersionRequest checkRequest = new CheckVersionRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); + CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest); break; case OpCode.multi: - MultiOperationRecord multiRequest = new MultiOperationRecord(); + MultiOperationRecord multiRequest; try { - request.readRequestRecord(multiRequest); + multiRequest = request.readRequestRecord(MultiOperationRecord::new); } catch (IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; @@ -854,7 +835,7 @@ private void pRequestHelper(Request request) { } else { /* Prep the request and convert to a Txn */ try { - pRequest2Txn(op.getType(), zxid, request, subrequest, false); + pRequest2Txn(op.getType(), zxid, request, subrequest); type = op.getType(); txn = request.getTxn(); } catch (KeeperException e) { @@ -899,7 +880,7 @@ private void pRequestHelper(Request request) { case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { - pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); + pRequest2Txn(request.type, zks.getNextZxid(), request, null); } break; @@ -944,20 +925,14 @@ private void pRequestHelper(Request request) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process {}", request, e); - StringBuilder sb = new StringBuilder(); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - for (byte b : payload) { - sb.append(String.format("%02x", (0xff & b))); - } - } else { - sb.append("request buffer is null"); - } - LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); - if (request.getHdr() != null) { - request.getHdr().setType(OpCode.error); - request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); + String digest = request.requestDigest(); + LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest); + if (request.getHdr() == null) { + request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type)); } + + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 1aee6aee24f..86a50fc5516 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Supplier; import org.apache.jute.Record; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; @@ -52,12 +53,12 @@ public class Request { // associated session timeout. Disabled by default. private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(System.getProperty("zookeeper.request_stale_latency_check", "false")); - public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List authInfo) { + public Request(ServerCnxn cnxn, long sessionId, int xid, int type, RequestRecord request, List authInfo) { this.cnxn = cnxn; this.sessionId = sessionId; this.cxid = xid; this.type = type; - this.request = bb; + this.request = request; this.authInfo = authInfo; } @@ -79,30 +80,42 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon public final int type; - private final ByteBuffer request; + private final RequestRecord request; - public void readRequestRecord(Record record) throws IOException { + public T readRequestRecord(Supplier constructor) throws IOException { if (request != null) { - request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request, record); - request.rewind(); - return; + return request.readRecord(constructor); } throw new IOException(new NullPointerException("request")); } + public T readRequestRecordNoException(Supplier constructor) { + try { + return readRequestRecord(constructor); + } catch (IOException e) { + return null; + } + } + public byte[] readRequestBytes() { if (request != null) { - request.rewind(); - int len = request.remaining(); - byte[] b = new byte[len]; - request.get(b); - request.rewind(); - return b; + return request.readBytes(); } return null; } + public String requestDigest() { + if (request != null) { + final StringBuilder sb = new StringBuilder(); + final byte[] payload = request.readBytes(); + for (byte b : payload) { + sb.append(String.format("%02x", (0xff & b))); + } + return sb.toString(); + } + return "request buffer is null"; + } + public final ServerCnxn cnxn; private TxnHeader hdr; @@ -423,18 +436,19 @@ public String toString() { && type != OpCode.setWatches && type != OpCode.setWatches2 && type != OpCode.closeSession - && request != null - && request.remaining() >= 4) { + && request != null) { try { // make sure we don't mess with request itself - ByteBuffer rbuf = request.asReadOnlyBuffer(); - rbuf.clear(); - int pathLen = rbuf.getInt(); - // sanity check - if (pathLen >= 0 && pathLen < 4096 && rbuf.remaining() >= pathLen) { - byte[] b = new byte[pathLen]; - rbuf.get(b); - path = new String(b, UTF_8); + byte[] bytes = request.readBytes(); + if (bytes != null && bytes.length >= 4) { + ByteBuffer buf = ByteBuffer.wrap(bytes); + int pathLen = buf.getInt(); + // sanity check + if (pathLen >= 0 && pathLen < 4096 && buf.remaining() >= pathLen) { + byte[] b = new byte[pathLen]; + buf.get(b); + path = new String(b, UTF_8); + } } } catch (Exception e) { // ignore - can't find the path, will output "n/a" instead diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java new file mode 100644 index 00000000000..6265f168e4e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *uuuuu + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "/RequuuAS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.jute.Record; + +public interface RequestRecord { + + static RequestRecord fromBytes(ByteBuffer buffer) { + return new ByteBufferRequestRecord(buffer); + } + + static RequestRecord fromBytes(byte[] bytes) { + return fromBytes(ByteBuffer.wrap(bytes)); + } + + static RequestRecord fromRecord(Record record) { + return new SimpleRequestRecord(record); + } + + T readRecord(Supplier clazz) throws IOException; + + byte[] readBytes(); + + int limit(); + +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java new file mode 100644 index 00000000000..a1c78ddad6a --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *uuuuu + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "/RequuuAS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; + +public class SimpleRequestRecord implements RequestRecord { + + private final Record record; + + private volatile byte[] bytes; + + public SimpleRequestRecord(Record record) { + this.record = record; + } + + @SuppressWarnings("unchecked") + @Override + public T readRecord(Supplier constructor) { + return (T) record; + } + + @SuppressFBWarnings("EI_EXPOSE_REP") + @Override + public byte[] readBytes() { + if (bytes != null) { + return bytes; + } + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + record.serialize(boa, "request"); + bytes = baos.toByteArray(); + return bytes; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int limit() { + byte[] bytes = readBytes(); + return ByteBuffer.wrap(bytes).limit(); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 817e84b3e59..f6c2b93ebf5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import javax.security.sasl.SaslException; -import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; @@ -276,7 +274,7 @@ protected enum State { } // Connection throttling - private BlueThrottle connThrottle = new BlueThrottle(); + private final BlueThrottle connThrottle = new BlueThrottle(); @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Internally the throttler has a BlockingQueue so " @@ -290,17 +288,17 @@ protected enum State { * too many large requests such that the JVM runs out of usable heap and * ultimately crashes. * - * The limit is enforced by the {@link checkRequestSize(int, boolean)} + * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} * method which is called by the connection layer ({@link NIOServerCnxn}, * {@link NettyServerCnxn}) before allocating a byte buffer and pulling * data off the TCP socket. The limit is then checked again by the - * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which - * also atomically updates {@link currentLargeRequestBytes}. The request is + * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which + * also atomically updates {@link #currentLargeRequestBytes}. The request is * then marked as a large request, with the request size stored in the Request - * object so that it can later be decremented from {@link currentLargeRequestsBytes}. + * object so that it can later be decremented from {@link #currentLargeRequestBytes}. * * When a request is completed or dropped, the relevant code path calls the - * {@link requestFinished(Request)} method which performs the decrement if + * {@link #requestFinished(Request)} method which performs the decrement if * needed. */ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; @@ -313,7 +311,7 @@ protected enum State { private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); - private AuthenticationHelper authHelper; + private final AuthenticationHelper authHelper = new AuthenticationHelper(); void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); @@ -329,7 +327,6 @@ public ZooKeeperServer() { listener = new ZooKeeperServerListenerImpl(this); serverStats = new ServerStats(this); this.requestPathMetricsCollector = new RequestPathMetricsCollector(); - this.authHelper = new AuthenticationHelper(); } /** @@ -371,8 +368,6 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio this.initLargeRequestThrottlingSettings(); - this.authHelper = new AuthenticationHelper(); - LOG.info( "Created server with" + " tickTime {} ms" @@ -1015,10 +1010,9 @@ long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); - ByteBuffer to = ByteBuffer.allocate(4); - to.putInt(timeout); + CreateSessionTxn txn = new CreateSessionTxn(timeout); cnxn.setSessionId(sessionId); - Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); + Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); submitRequest(si); return sessionId; } @@ -1595,13 +1589,7 @@ public void requestFinished(Request request) { } } - public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { - // We have the request, now process and setup for next - InputStream bais = new ByteBufferInputStream(incomingBuffer); - BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); - RequestHeader h = new RequestHeader(); - h.deserialize(bia, "header"); - + public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. @@ -1613,14 +1601,9 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); - // Through the magic of byte buffers, txn will not be - // pointing - // to the start of the txn - incomingBuffer = incomingBuffer.slice(); if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); - AuthPacket authPacket = new AuthPacket(); - ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); + AuthPacket authPacket = request.readRecord(AuthPacket::new); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; @@ -1660,15 +1643,15 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE } return; } else if (h.getType() == OpCode.sasl) { - processSasl(incomingBuffer, cnxn, h); + processSasl(request, cnxn, h); } else { if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); - int length = incomingBuffer.limit(); + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); + int length = request.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); @@ -1706,10 +1689,9 @@ private static boolean shouldAllowSaslFailedClientsConnect() { return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); } - private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { + private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { LOG.debug("Responding to client SASL token."); - GetSASLRequest clientTokenRecord = new GetSASLRequest(); - ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); + GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); byte[] clientToken = clientTokenRecord.getToken(); LOG.debug("Size of client SASL token: {}", clientToken.length); byte[] responseToken = null; @@ -2180,8 +2162,8 @@ private String effectiveACLPath(Request request) throws KeeperException.BadArgum switch (request.type) { case OpCode.create: case OpCode.create2: { - CreateRequest req = new CreateRequest(); - if (readRequestRecord(request, req)) { + CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); + if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = parentPath(req.getPath()); @@ -2189,22 +2171,22 @@ private String effectiveACLPath(Request request) throws KeeperException.BadArgum break; } case OpCode.delete: { - DeleteRequest req = new DeleteRequest(); - if (readRequestRecord(request, req)) { + DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); + if (req != null) { path = parentPath(req.getPath()); } break; } case OpCode.setData: { - SetDataRequest req = new SetDataRequest(); - if (readRequestRecord(request, req)) { + SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); + if (req != null) { path = req.getPath(); } break; } case OpCode.setACL: { - SetACLRequest req = new SetACLRequest(); - if (readRequestRecord(request, req)) { + SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); + if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = req.getPath(); @@ -2298,15 +2280,6 @@ public boolean authWriteRequest(Request request) { return err == KeeperException.Code.OK.intValue(); } - private boolean readRequestRecord(Request request, Record record) { - try { - request.readRequestRecord(record); - return true; - } catch (IOException ex) { - return false; - } - } - public int getOutstandingHandshakeNum() { if (serverCnxnFactory instanceof NettyServerCnxnFactory) { return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index b6eeb758ac9..1818bf9bb95 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -804,9 +804,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception { continue; } packetsCommitted.remove(); - Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); - request.setTxn(p.rec); - request.setHdr(p.hdr); + Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); request.setTxnDigest(p.digest); ozk.commitRequest(request); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 259d9fff915..eea11d33a2c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -43,6 +43,7 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.TxnLogProposalIterator; import org.apache.zookeeper.server.ZKDatabase; @@ -702,9 +703,9 @@ public void run() { bb = bb.slice(); Request si; if (type == OpCode.sync) { - si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); + si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); } else { - si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); + si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); } si.setOwner(this); learnerMaster.submitLearnerRequest(si); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java index d4c83aeab7b..6892d3dd8a7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java @@ -18,17 +18,17 @@ package org.apache.zookeeper.server.quorum; -import java.nio.ByteBuffer; import java.util.List; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; public class LearnerSyncRequest extends Request { LearnerHandler fh; public LearnerSyncRequest( - LearnerHandler fh, long sessionId, int xid, int type, ByteBuffer bb, List authInfo) { - super(null, sessionId, xid, type, bb, authInfo); + LearnerHandler fh, long sessionId, int xid, int type, RequestRecord request, List authInfo) { + super(null, sessionId, xid, type, request, authInfo); this.fh = fh; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 2f24347b778..240936956fc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.PrintWriter; -import java.nio.ByteBuffer; import java.util.Objects; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -32,10 +31,12 @@ import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.txn.CreateSessionTxn; /** * Abstract base class for all ZooKeeperServers that participate in @@ -76,8 +77,7 @@ public Request checkUpgradeSession(Request request) throws IOException, KeeperEx } if (OpCode.multi == request.type) { - MultiOperationRecord multiTransactionRecord = new MultiOperationRecord(); - request.readRequestRecord(multiTransactionRecord); + MultiOperationRecord multiTransactionRecord = request.readRequestRecord(MultiOperationRecord::new); boolean containsEphemeralCreate = false; for (Op op : multiTransactionRecord) { if (op.getType() == OpCode.create || op.getType() == OpCode.create2) { @@ -93,8 +93,7 @@ public Request checkUpgradeSession(Request request) throws IOException, KeeperEx return null; } } else { - CreateRequest createRequest = new CreateRequest(); - request.readRequestRecord(createRequest); + CreateRequest createRequest = request.readRequestRecord(CreateRequest::new); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (!createMode.isEphemeral()) { return null; @@ -116,9 +115,8 @@ private Request makeUpgradeRequest(long sessionId) { synchronized (upgradeableSessionTracker) { if (upgradeableSessionTracker.isLocalSession(sessionId)) { int timeout = upgradeableSessionTracker.upgradeSession(sessionId); - ByteBuffer to = ByteBuffer.allocate(4); - to.putInt(timeout); - return new Request(null, sessionId, 0, OpCode.createSession, to, null); + CreateSessionTxn txn = new CreateSessionTxn(timeout); + return new Request(null, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); } } return null; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java index df3b831a30d..8a700bbdf4f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java @@ -39,9 +39,9 @@ import org.apache.zookeeper.audit.AuditEvent.Result; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.server.util.AuthUtil; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.apache.zookeeper.test.LoggerTestTool; @@ -290,9 +290,7 @@ private String getSession() { private String getUser() { ServerCnxn next = getServerCnxn(); - Request request = new Request(next, -1, -1, -1, null, - next.getAuthInfo()); - return request.getUsersForAudit(); + return AuthUtil.getUsers(next.getAuthInfo()); } private String getIp() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java index f9fd6d8b588..0e6c1b58f92 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java @@ -19,10 +19,12 @@ package org.apache.zookeeper.server; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -36,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; import org.apache.zookeeper.ZooDefs; @@ -91,7 +94,7 @@ public void testCreateWithStat() throws KeeperException, InterruptedException { Stat stat = createWithStatVerifyResult("/foo"); Stat childStat = createWithStatVerifyResult("/foo/child"); // Don't expect to get the same stats for different creates. - assertFalse(stat.equals(childStat)); + assertNotEquals(stat, childStat); } @SuppressWarnings("ConstantConditions") @@ -224,7 +227,11 @@ public void testMaxPerMinute() throws InterruptedException { RequestProcessor processor = new RequestProcessor() { @Override public void processRequest(Request request) { - queue.add(new String(request.readRequestBytes())); + try { + queue.add(request.readRequestRecord(DeleteContainerRequest::new).getPath()); + } catch (IOException e) { + fail(e); + } } @Override @@ -246,14 +253,13 @@ protected Collection getCandidates() { containerManager.checkContainers(); return null; }); - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one"); - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two"); - assertEquals(queue.size(), 0); + assertEquals("/one", queue.poll(5, TimeUnit.SECONDS)); + assertEquals("/two", queue.poll(5, TimeUnit.SECONDS)); + assertEquals(0, queue.size()); Thread.sleep(500); - assertEquals(queue.size(), 0); - - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/three"); - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/four"); + assertEquals(0, queue.size()); + assertEquals("/three", queue.poll(5, TimeUnit.SECONDS)); + assertEquals("/four", queue.poll(5, TimeUnit.SECONDS)); } @Test diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java index 0181d2ebeae..4b47d6cbb3c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java @@ -96,7 +96,7 @@ public void testACLDigestHashHiding_NoAuth_WorldCanRead() { // Arrange // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList()); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList()); processor.processRequest(r); // Assert @@ -109,7 +109,7 @@ public void testACLDigestHashHiding_NoAuth_NoWorld() { testACLs.remove(2); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList()); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList()); processor.processRequest(r); // Assert @@ -123,7 +123,7 @@ public void testACLDigestHashHiding_UserCanRead() { authInfo.add(new Id("digest", "otheruser:somesecrethash")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert @@ -137,7 +137,7 @@ public void testACLDigestHashHiding_UserCanAll() { authInfo.add(new Id("digest", "user:secrethash")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert @@ -151,7 +151,7 @@ public void testACLDigestHashHiding_AdminUser() { authInfo.add(new Id("digest", "adminuser:adminsecret")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert @@ -167,7 +167,7 @@ public void testACLDigestHashHiding_OnlyAdmin() { authInfo.add(new Id("digest", "adminuser:adminsecret")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java index 001e2363548..6826910103c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java @@ -117,7 +117,7 @@ private Request makeGetDataRequest(String path, long sessionId) throws IOExcepti GetDataRequest getDataRequest = new GetDataRequest(path, false); getDataRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList()); } private Request makeCreateRequest(String path, long sessionId) throws IOException { @@ -126,7 +126,7 @@ private Request makeCreateRequest(String path, long sessionId) throws IOExceptio CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList()); } private QuorumZooKeeperServer getConnectedServer(long sessionId) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java index 062cae1f9db..bfb0db5f3fb 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java @@ -29,15 +29,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; @@ -94,12 +91,7 @@ public void tearDown() throws Exception { } private Request createRequest(Record record, int opCode) throws IOException { - // encoding - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - record.serialize(boa, "request"); - baos.close(); - return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), null); + return new Request(null, 1L, 0, opCode, RequestRecord.fromRecord(record), null); } private Request createRequest(String path, int opCode) throws IOException { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java index 9e71205697d..d80c5e08ed2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java @@ -24,11 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.IOException; import java.io.PrintWriter; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -37,7 +34,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -66,13 +62,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PrepRequestProcessorTest extends ClientBase { - private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorTest.class); private static final int CONNECTION_TIMEOUT = 3000; private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique(); private CountDownLatch pLatch; @@ -120,34 +113,28 @@ public void teardown() throws Exception { public void testPRequest() throws Exception { pLatch = new CountDownLatch(1); processor = new PrepRequestProcessor(zks, new MyRequestProcessor()); - Request foo = new Request(null, 1L, 1, OpCode.create, ByteBuffer.allocate(3), null); + Request foo = new Request(null, 1L, 1, OpCode.create, RequestRecord.fromBytes(new byte[3]), null); processor.pRequest(foo); assertEquals(new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.getTxn(), "Request should have marshalling error"); assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain"); } - private Request createRequest(Record record, int opCode) throws IOException { + private Request createRequest(Record record, int opCode) { return createRequest(record, opCode, 1L); } - private Request createRequest(Record record, int opCode, long sessionId) throws IOException { + private Request createRequest(Record record, int opCode, long sessionId) { return createRequest(record, opCode, sessionId, false); } - private Request createRequest(Record record, int opCode, boolean admin) throws IOException { + private Request createRequest(Record record, int opCode, boolean admin) { return createRequest(record, opCode, 1L, admin); } - private Request createRequest(Record record, int opCode, long sessionId, boolean admin) throws IOException { - // encoding - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - record.serialize(boa, "request"); - baos.close(); - // Id - List ids = Arrays.asList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE); - return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids); + private Request createRequest(Record record, int opCode, long sessionId, boolean admin) { + List ids = Collections.singletonList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE); + return new Request(null, sessionId, 0, opCode, RequestRecord.fromRecord(record), ids); } private void process(List ops) throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java index 16c7a7dd1c8..681e8348b07 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java @@ -19,7 +19,6 @@ package org.apache.zookeeper.server; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.ZKTestCase; @@ -67,7 +66,7 @@ public void testUnrecoverableErrorCountFromRequestProcessor() throws Exception { PrepRequestProcessor processor = new MyPrepRequestProcessor(); processor.start(); - processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null)); + processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null)); processed.await(); processor.shutdown(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java index 1050a474bc0..e5f668d1f3e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java @@ -43,6 +43,7 @@ import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.WorkerService; import org.apache.zookeeper.server.ZooKeeperServerListener; import org.junit.jupiter.api.AfterEach; @@ -129,7 +130,7 @@ private Request newRequest(Record rec, int type, int sessionId, int xid) throws BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); rec.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, xid, type, bb, new ArrayList()); + return new Request(null, sessionId, xid, type, RequestRecord.fromBytes(bb), new ArrayList()); } /** diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java index 4b67f5b50bb..4a45983555a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -31,6 +30,7 @@ import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.WorkerService; import org.junit.jupiter.api.AfterEach; @@ -192,11 +192,11 @@ private void checkTimeMetric(long actual, long lBoundrary, long hBoundrary) { } private Request createReadRequest(long sessionId, int xid) { - return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, ByteBuffer.wrap(new byte[10]), null); + return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, RequestRecord.fromBytes(new byte[10]), null); } private Request createWriteRequest(long sessionId, int xid) { - return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null); + return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null); } private void processRequestWithWait(Request request) throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java index 46958d1949c..46a4c387479 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.server.PrepRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.test.ClientBase; import org.junit.jupiter.api.AfterEach; @@ -160,7 +161,7 @@ public void sendWriteRequest() throws Exception { + (++nodeId), new byte[0], Ids.OPEN_ACL_UNSAFE, 1); createReq.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - Request req = new Request(null, sessionId, ++cxid, OpCode.create, bb, new ArrayList()); + Request req = new Request(null, sessionId, ++cxid, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList()); zks.getFirstProcessor().processRequest(req); } @@ -174,7 +175,7 @@ public void sendReadRequest() throws Exception { + nodeId, false); getDataRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - Request req = new Request(null, sessionId, ++cxid, OpCode.getData, bb, new ArrayList()); + Request req = new Request(null, sessionId, ++cxid, OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList()); zks.getFirstProcessor().processRequest(req); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java index ef1f1219452..5216eb70324 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java @@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.net.SocketException; -import java.nio.ByteBuffer; import javax.security.sasl.SaslException; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZooDefs; @@ -32,11 +31,13 @@ import org.apache.zookeeper.server.PrepRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.txn.DeleteTxn; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -220,7 +221,8 @@ public void shutdown() { * Add a request so that something is there for SyncRequestProcessor * to process, while we are in shutdown flow */ - Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, ByteBuffer.wrap("/deadLockIssue".getBytes()), null); + DeleteTxn deleteTxn = new DeleteTxn("/deadLockIssue"); + Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, RequestRecord.fromRecord(deleteTxn), null); processRequest(request); super.shutdown(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java index 8803a73cbab..3d60c8b9174 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java @@ -40,6 +40,7 @@ import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.test.ClientBase; import org.junit.jupiter.api.AfterEach; @@ -317,8 +318,7 @@ protected void request(Request request) throws IOException { } if (request.type == ZooDefs.OpCode.create && request.cnxn != null) { - CreateRequest createRequest = new CreateRequest(); - request.readRequestRecord(createRequest); + CreateRequest createRequest = request.readRequestRecord(CreateRequest::new); try { CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isEphemeral()) { @@ -355,7 +355,7 @@ private Request createEphemeralRequest(String path, long sessionId) throws IOExc CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList()); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java index 72bceafa371..17ccdae6ea6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,6 +33,7 @@ import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; @@ -65,7 +65,7 @@ public void setup() throws Exception { } private Request createRquest(long sessionId, int xid) { - return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null); + return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null); } @Test diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java index 99cd171c01f..a619866d91c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java @@ -33,6 +33,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -100,7 +101,7 @@ public void testCreateEphemeral(boolean localSessionEnabled) throws Exception { LOG.info("Fake session Id: {}", Long.toHexString(fakeSessionId)); - Request request = new Request(null, fakeSessionId, 0, OpCode.create, bb, new ArrayList()); + Request request = new Request(null, fakeSessionId, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); @@ -138,7 +139,7 @@ public void testCreatePersistent() throws Exception { LOG.info("Local session Id: {}", Long.toHexString(locallSession)); - Request request = new Request(null, locallSession, 0, OpCode.create, bb, new ArrayList()); + Request request = new Request(null, locallSession, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); From 90f813ea38a85ff2715662bad75f9bb6387fe4a6 Mon Sep 17 00:00:00 2001 From: Mike Drob Date: Tue, 11 Oct 2022 22:23:12 +0200 Subject: [PATCH 4/4] ZOOKEEPER-4303: Allow configuring client port to 0 https://issues.apache.org/jira/browse/ZOOKEEPER-4303 Allows specifying an explicit port 0 for the client port or secure client port, which will default to operating system behavior of finding an unbound port. Modified ZKServerEmbedded to report the actual port instead of the configured port. Author: Mike Drob Reviewers: Kezhu Wang , Enrico Olivelli , Mate Szalay-Beko Closes #1868 from madrob/zookeeper-4303 --- .../server/NIOServerCnxnFactory.java | 4 +++ .../zookeeper/server/ZooKeeperServerMain.java | 17 ++++++++++ .../embedded/ZooKeeperServerEmbeddedImpl.java | 30 ++++++++++------- .../zookeeper/server/quorum/QuorumPeer.java | 7 ++++ .../server/quorum/QuorumPeerConfig.java | 8 ++--- .../embedded/ZookeeperServerEmbeddedTest.java | 32 +++++++++++++++++++ 6 files changed, 83 insertions(+), 15 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java index 57495c10aa4..f081ab24bad 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -663,6 +663,10 @@ public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean se } else { ss.socket().bind(addr, listenBacklog); } + if (addr.getPort() == 0) { + // We're likely bound to a different port than was requested, so log that too + LOG.info("bound to port {}", ss.getLocalAddress()); + } ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java index 7bd30baa6e9..d6cbbe4a1ee 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java @@ -244,6 +244,23 @@ ServerCnxnFactory getSecureCnxnFactory() { return secureCnxnFactory; } + // VisibleForTesting + public int getClientPort() { + if (cnxnFactory != null) { + return cnxnFactory.getLocalPort(); + } + return 0; + } + + // VisibleForTesting + public int getSecureClientPort() { + if (secureCnxnFactory != null) { + return secureCnxnFactory.getLocalPort(); + } + return 0; + } + + /** * Shutdowns properly the service, this method is not a public API. */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java index ec6ae637a0c..ad41a3228e7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java @@ -1,6 +1,7 @@ package org.apache.zookeeper.server.embedded; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; @@ -45,6 +46,9 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded { private final ExitHandler exitHandler; private volatile boolean stopping; + private int boundClientPort; + private int boundSecureClientPort; + ZooKeeperServerEmbeddedImpl(Properties p, Path baseDir, ExitHandler exitHandler) throws Exception { if (!p.containsKey("dataDir")) { p.put("dataDir", baseDir.resolve("data").toAbsolutePath().toString()); @@ -103,6 +107,8 @@ protected QuorumPeer getQuorumPeer() throws SaslException { @Override public void start() { super.start(); + boundClientPort = getClientPort(); + boundSecureClientPort = getSecureClientPort(); LOG.info("ZK Server {} started", this); started.complete(null); } @@ -142,6 +148,8 @@ public void run() { @Override public void serverStarted() { LOG.info("ZK Server started"); + boundClientPort = getClientPort(); + boundSecureClientPort = getSecureClientPort(); started.complete(null); } }; @@ -184,22 +192,22 @@ public void run() { @Override public String getConnectionString() { - if (config.getClientPortAddress() != null) { - String raw = config.getClientPortAddress().getHostString() + ":" + config.getClientPortAddress().getPort(); - return raw.replace("0.0.0.0", "localhost"); - } else { - throw new IllegalStateException("No client address is configured"); - } + return prettifyConnectionString(config.getClientPortAddress(), boundClientPort); } @Override public String getSecureConnectionString() { - if (config.getSecureClientPortAddress() != null) { - String raw = config.getSecureClientPortAddress().getHostString() + ":" + config.getSecureClientPortAddress().getPort(); - return raw.replace("0.0.0.0", "localhost"); - } else { - throw new IllegalStateException("No client address is configured"); + return prettifyConnectionString(config.getSecureClientPortAddress(), boundSecureClientPort); + } + + private String prettifyConnectionString(InetSocketAddress confAddress, int boundPort) { + if (confAddress != null) { + return confAddress.getHostString() + .replace("0.0.0.0", "localhost") + .replace("0:0:0:0:0:0:0:0", "localhost") + + ":" + boundPort; } + throw new IllegalStateException("No client address is configured"); } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 18e97bb2f0c..220d813f42b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -2139,6 +2139,13 @@ public int getClientPort() { return -1; } + public int getSecureClientPort() { + if (secureCnxnFactory != null) { + return secureCnxnFactory.getLocalPort(); + } + return -1; + } + public void setTxnFactory(FileTxnSnapLog factory) { this.logFactory = factory; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index 1b37f291f51..d478c409c8f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -271,8 +271,8 @@ public static String getVersionFromFilename(String filename) { * @throws ConfigException */ public void parseProperties(Properties zkProp) throws IOException, ConfigException { - int clientPort = 0; - int secureClientPort = 0; + Integer clientPort = null; + Integer secureClientPort = null; int observerMasterPort = 0; String clientPortAddress = null; String secureClientPortAddress = null; @@ -427,7 +427,7 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti dataLogDir = dataDir; } - if (clientPort == 0) { + if (clientPort == null) { LOG.info("clientPort is not set"); if (clientPortAddress != null) { throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set"); @@ -440,7 +440,7 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress)); } - if (secureClientPort == 0) { + if (secureClientPort == null) { LOG.info("secureClientPort is not set"); if (secureClientPortAddress != null) { throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set"); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java index d9868b235df..00bfced0cd4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java @@ -17,12 +17,17 @@ */ package org.apache.zookeeper.server.embedded; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Path; import java.util.Properties; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.test.ClientBase; +import org.junit.function.ThrowingRunnable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -95,4 +100,31 @@ public void testStart() throws Exception { } + @Test + public void testBindPortZero() throws Exception { + final Properties configZookeeper = new Properties(); + final ZooKeeperServerEmbedded.ZookKeeperServerEmbeddedBuilder builder = ZooKeeperServerEmbedded.builder() + .baseDir(baseDir) + .configuration(configZookeeper) + .exitHandler(ExitHandler.LOG_ONLY); + + // Unconfigured client port will still fail + try (ZooKeeperServerEmbedded zkServer = builder.build()) { + zkServer.start(); + assertThrows(IllegalStateException.class, new ThrowingRunnable() { + @Override + public void run() throws Throwable { + zkServer.getConnectionString(); + } + }); + } + + // Explicit port zero should work + configZookeeper.put("clientPort", "0"); + try (ZooKeeperServerEmbedded zkServer = builder.build()) { + zkServer.start(); + assertThat(zkServer.getConnectionString(), not(endsWith(":0"))); + assertTrue(ClientBase.waitForServerUp(zkServer.getConnectionString(), 60000)); + } + } }