diff --git a/.travis.yml b/.travis.yml index 81471d9ed04..7fd2dcd02ff 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,15 +5,14 @@ matrix: - os: linux arch: arm64 jdk: openjdk11 + install: | + f=$(which javac) + while [[ -L $f ]]; do f=$(readlink $f); done + export JAVA_HOME=${f%/bin/*} + - os: linux arch: s390x jdk: openjdk11 - addons: - apt: - update: true - packages: - - maven - - libcppunit-dev cache: directories: @@ -21,13 +20,10 @@ cache: addons: apt: + update: true packages: - - libcppunit-dev - -install: - - if [ "${TRAVIS_CPU_ARCH}" == "arm64" ]; then - sudo apt-get install maven; - fi + - maven + - libcppunit-dev script: mvn clean apache-rat:check verify -DskipTests spotbugs:check checkstyle:check -Pfull-build diff --git a/dev/docker/Dockerfile b/dev/docker/Dockerfile index bd2977f7402..a1b33569e79 100644 --- a/dev/docker/Dockerfile +++ b/dev/docker/Dockerfile @@ -17,7 +17,7 @@ # under the License. # -FROM maven:3.6.3-jdk-8 +FROM maven:3.8.4-jdk-8 RUN apt-get update RUN apt-get install -y \ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java new file mode 100644 index 00000000000..b7fd12689d2 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import org.apache.jute.InputArchive; +import org.apache.jute.OutputArchive; +import org.apache.jute.Record; + +public class DeleteContainerRequest implements Record { + private String path; + + public DeleteContainerRequest() { + } + + public DeleteContainerRequest(String path) { + this.path = path; + } + + public String getPath() { + return path; + } + + @Override + public void serialize(OutputArchive archive, String tag) throws IOException { + archive.writeBuffer(path.getBytes(StandardCharsets.UTF_8), "path"); + } + + @Override + public void deserialize(InputArchive archive, String tag) throws IOException { + byte[] bytes = archive.readBuffer("path"); + path = new String(bytes, StandardCharsets.UTF_8); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java index 5aca1711f7a..d6df7d92438 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MultiOperationRecord; @@ -58,7 +57,7 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool if (!ZKAuditProvider.isAuditEnabled()) { return; } - String op = null; + String op; //For failed transaction rc.path is null String path = txnResult.path; String acls = null; @@ -69,8 +68,7 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool case ZooDefs.OpCode.create2: case ZooDefs.OpCode.createContainer: op = AuditConstants.OP_CREATE; - CreateRequest createRequest = new CreateRequest(); - deserialize(request, createRequest); + CreateRequest createRequest = request.readRequestRecord(CreateRequest::new); createMode = getCreateMode(createRequest); if (failedTxn) { path = createRequest.getPath(); @@ -80,23 +78,20 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool case ZooDefs.OpCode.deleteContainer: op = AuditConstants.OP_DELETE; if (failedTxn) { - DeleteRequest deleteRequest = new DeleteRequest(); - deserialize(request, deleteRequest); + DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new); path = deleteRequest.getPath(); } break; case ZooDefs.OpCode.setData: op = AuditConstants.OP_SETDATA; if (failedTxn) { - SetDataRequest setDataRequest = new SetDataRequest(); - deserialize(request, setDataRequest); + SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new); path = setDataRequest.getPath(); } break; case ZooDefs.OpCode.setACL: op = AuditConstants.OP_SETACL; - SetACLRequest setACLRequest = new SetACLRequest(); - deserialize(request, setACLRequest); + SetACLRequest setACLRequest = request.readRequestRecord(SetACLRequest::new); acls = ZKUtil.aclToString(setACLRequest.getAcl()); if (failedTxn) { path = setACLRequest.getPath(); @@ -125,10 +120,6 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool } } - private static void deserialize(Request request, Record record) throws IOException { - request.readRequestRecord(record); - } - private static Result getResult(ProcessTxnResult rc, boolean failedTxn) { if (failedTxn) { return Result.FAILURE; @@ -191,8 +182,7 @@ private static Map getCreateModes(Request request) if (!ZKAuditProvider.isAuditEnabled()) { return createModes; } - MultiOperationRecord multiRequest = new MultiOperationRecord(); - deserialize(request, multiRequest); + MultiOperationRecord multiRequest = request.readRequestRecord(MultiOperationRecord::new); for (Op op : multiRequest) { if (op.getType() == ZooDefs.OpCode.create || op.getType() == ZooDefs.OpCode.create2 || op.getType() == ZooDefs.OpCode.createContainer) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java new file mode 100644 index 00000000000..5ddae2460ac --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *uuuuu + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "/RequuuAS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.jute.Record; + +public class ByteBufferRequestRecord implements RequestRecord { + + private final ByteBuffer request; + + private volatile Record record; + + public ByteBufferRequestRecord(ByteBuffer request) { + this.request = request; + } + + @SuppressWarnings("unchecked") + @Override + public T readRecord(Supplier constructor) throws IOException { + if (record != null) { + return (T) record; + } + + record = constructor.get(); + request.rewind(); + ByteBufferInputStream.byteBuffer2Record(request, record); + request.rewind(); + return (T) record; + } + + @Override + public byte[] readBytes() { + request.rewind(); + int len = request.remaining(); + byte[] b = new byte[len]; + request.get(b); + request.rewind(); + return b; + } + + @Override + public int limit() { + return request.limit(); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java index 7abac587aa9..2664348c244 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java @@ -18,8 +18,6 @@ package org.apache.zookeeper.server; -import static java.nio.charset.StandardCharsets.UTF_8; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashSet; import java.util.Set; @@ -27,6 +25,7 @@ import java.util.TimerTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.common.Time; import org.slf4j.Logger; @@ -129,8 +128,8 @@ public void checkContainers() throws InterruptedException { for (String containerPath : getCandidates()) { long startMs = Time.currentElapsedTime(); - ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes(UTF_8)); - Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, path, null); + DeleteContainerRequest record = new DeleteContainerRequest(containerPath); + Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, RequestRecord.fromRecord(record), null); try { LOG.info("Attempting to delete candidate container: {}", containerPath); postDeleteRequest(request); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index bc5b019f812..693a6b86e58 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -269,8 +269,7 @@ public void processRequest(Request request) { } case OpCode.multiRead: { lastOp = "MLTR"; - MultiOperationRecord multiReadRecord = new MultiOperationRecord(); - request.readRequestRecord(multiReadRecord); + MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new); rsp = new MultiResponse(); OpResult subResult; for (Op readOp : multiReadRecord) { @@ -348,8 +347,7 @@ public void processRequest(Request request) { } case OpCode.sync: { lastOp = "SYNC"; - SyncRequest syncRequest = new SyncRequest(); - request.readRequestRecord(syncRequest); + SyncRequest syncRequest = request.readRequestRecord(SyncRequest::new); rsp = new SyncResponse(syncRequest.getPath()); requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath()); break; @@ -363,8 +361,7 @@ public void processRequest(Request request) { case OpCode.exists: { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! - ExistsRequest existsRequest = new ExistsRequest(); - request.readRequestRecord(existsRequest); + ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new); path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); @@ -376,8 +373,7 @@ public void processRequest(Request request) { } case OpCode.getData: { lastOp = "GETD"; - GetDataRequest getDataRequest = new GetDataRequest(); - request.readRequestRecord(getDataRequest); + GetDataRequest getDataRequest = request.readRequestRecord(GetDataRequest::new); path = getDataRequest.getPath(); rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); @@ -385,8 +381,7 @@ public void processRequest(Request request) { } case OpCode.setWatches: { lastOp = "SETW"; - SetWatches setWatches = new SetWatches(); - request.readRequestRecord(setWatches); + SetWatches setWatches = request.readRequestRecord(SetWatches::new); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase() .setWatches( @@ -401,8 +396,7 @@ public void processRequest(Request request) { } case OpCode.setWatches2: { lastOp = "STW2"; - SetWatches2 setWatches = new SetWatches2(); - request.readRequestRecord(setWatches); + SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), @@ -415,16 +409,14 @@ public void processRequest(Request request) { } case OpCode.addWatch: { lastOp = "ADDW"; - AddWatchRequest addWatcherRequest = new AddWatchRequest(); - request.readRequestRecord(addWatcherRequest); + AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new); zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode()); rsp = new ErrorResponse(0); break; } case OpCode.getACL: { lastOp = "GETA"; - GetACLRequest getACLRequest = new GetACLRequest(); - request.readRequestRecord(getACLRequest); + GetACLRequest getACLRequest = request.readRequestRecord(GetACLRequest::new); path = getACLRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { @@ -466,8 +458,7 @@ public void processRequest(Request request) { } case OpCode.getChildren: { lastOp = "GETC"; - GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); - request.readRequestRecord(getChildrenRequest); + GetChildrenRequest getChildrenRequest = request.readRequestRecord(GetChildrenRequest::new); path = getChildrenRequest.getPath(); rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); @@ -475,8 +466,7 @@ public void processRequest(Request request) { } case OpCode.getAllChildrenNumber: { lastOp = "GETACN"; - GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest(); - request.readRequestRecord(getAllChildrenNumberRequest); + GetAllChildrenNumberRequest getAllChildrenNumberRequest = request.readRequestRecord(GetAllChildrenNumberRequest::new); path = getAllChildrenNumberRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { @@ -495,8 +485,7 @@ public void processRequest(Request request) { } case OpCode.getChildren2: { lastOp = "GETC"; - GetChildren2Request getChildren2Request = new GetChildren2Request(); - request.readRequestRecord(getChildren2Request); + GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new); Stat stat = new Stat(); path = getChildren2Request.getPath(); DataNode n = zks.getZKDatabase().getNode(path); @@ -517,8 +506,7 @@ public void processRequest(Request request) { } case OpCode.checkWatches: { lastOp = "CHKW"; - CheckWatchesRequest checkWatches = new CheckWatchesRequest(); - request.readRequestRecord(checkWatches); + CheckWatchesRequest checkWatches = request.readRequestRecord(CheckWatchesRequest::new); WatcherType type = WatcherType.fromInt(checkWatches.getType()); path = checkWatches.getPath(); boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn); @@ -531,8 +519,7 @@ public void processRequest(Request request) { } case OpCode.removeWatches: { lastOp = "REMW"; - RemoveWatchesRequest removeWatches = new RemoveWatchesRequest(); - request.readRequestRecord(removeWatches); + RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new); WatcherType type = WatcherType.fromInt(removeWatches.getType()); path = removeWatches.getPath(); boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn); @@ -550,8 +537,7 @@ public void processRequest(Request request) { } case OpCode.getEphemerals: { lastOp = "GETE"; - GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest(); - request.readRequestRecord(getEphemerals); + GetEphemeralsRequest getEphemerals = request.readRequestRecord(GetEphemeralsRequest::new); String prefixPath = getEphemerals.getPrefixPath(); Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId); List ephemerals = new ArrayList<>(); @@ -585,16 +571,8 @@ public void processRequest(Request request) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process {}", request, e); - StringBuilder sb = new StringBuilder(); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - for (byte b : payload) { - sb.append(String.format("%02x", (0xff & b))); - } - } else { - sb.append("request buffer is null"); - } - LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); + String digest = request.requestDigest(); + LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest); err = Code.MARSHALLINGERROR; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 83e7491e51b..5ffc81da1c3 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread; import org.apache.zookeeper.server.command.CommandExecutor; @@ -392,7 +393,10 @@ void doIO(SelectionKey k) throws InterruptedException { } protected void readRequest() throws IOException { - zkServer.processPacket(this, incomingBuffer); + RequestHeader h = new RequestHeader(); + ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h); + RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice()); + zkServer.processPacket(this, h, request); } // returns whether we are interested in writing, which is determined diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java index 57495c10aa4..f081ab24bad 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java @@ -663,6 +663,10 @@ public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean se } else { ss.socket().bind(addr, listenBacklog); } + if (addr.getPort() == 0) { + // We're likely bound to a different port than was requested, so log that too + LOG.info("bound to port {}", ss.getLocalAddress()); + } ss.configureBlocking(false); acceptThread = new AcceptThread(ss, addr, selectorThreads); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index ae482ce2b5e..f95200d560b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -47,6 +47,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ReplyHeader; +import org.apache.zookeeper.proto.RequestHeader; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.command.CommandExecutor; import org.apache.zookeeper.server.command.FourLetterCommands; @@ -478,9 +479,10 @@ private void receiveMessage(ByteBuf message) { throw new IOException("ZK down"); } if (initialized) { - // TODO: if zks.processPacket() is changed to take a ByteBuffer[], - // we could implement zero-copy queueing. - zks.processPacket(this, bb); + RequestHeader h = new RequestHeader(); + ByteBufferInputStream.byteBuffer2Record(bb, h); + RequestRecord request = RequestRecord.fromBytes(bb.slice()); + zks.processPacket(this, h, request); } else { LOG.debug("got conn req request from {}", getRemoteSocketAddress()); BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb)); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index 9733a48aca1..35293359fe7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -18,7 +18,6 @@ package org.apache.zookeeper.server; -import static java.nio.charset.StandardCharsets.UTF_8; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.StringReader; @@ -36,6 +35,7 @@ import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.BadArgumentsException; import org.apache.zookeeper.KeeperException.Code; @@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req */ private static boolean failCreate = false; - LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue(); + LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue<>(); private final RequestProcessor nextProcessor; private final boolean digestEnabled; @@ -311,13 +311,8 @@ private String validatePathForCreate(String path, long sessionId) throws BadArgu /** * This method will be called inside the ProcessRequestThread, which is a * singleton, so there will be a single thread calling this code. - * - * @param type - * @param zxid - * @param request - * @param record */ - protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException { + protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException { if (request.getHdr() == null) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type)); @@ -328,11 +323,12 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.create2: case OpCode.createTTL: case OpCode.createContainer: { - pRequest2TxnCreate(type, request, record, deserialize); + pRequest2TxnCreate(type, request, record); break; } case OpCode.deleteContainer: { - String path = new String(request.readRequestBytes(), UTF_8); + DeleteContainerRequest txn = (DeleteContainerRequest) record; + String path = txn.getPath(); String parentPath = getParentPathAndValidate(path); ChangeRecord nodeRecord = getRecordForPath(path); if (nodeRecord.childCount > 0) { @@ -359,9 +355,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.delete: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); DeleteRequest deleteRequest = (DeleteRequest) record; - if (deserialize) { - request.readRequestRecord(deleteRequest); - } String path = deleteRequest.getPath(); String parentPath = getParentPathAndValidate(path); ChangeRecord parentRecord = getRecordForPath(parentPath); @@ -387,9 +380,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.setData: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetDataRequest setDataRequest = (SetDataRequest) record; - if (deserialize) { - request.readRequestRecord(setDataRequest); - } path = setDataRequest.getPath(); validatePath(path, request.sessionId); nodeRecord = getRecordForPath(path); @@ -559,9 +549,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.setACL: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetACLRequest setAclRequest = (SetACLRequest) record; - if (deserialize) { - request.readRequestRecord(setAclRequest); - } path = setAclRequest.getPath(); validatePath(path, request.sessionId); List listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl()); @@ -577,8 +564,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, addChangeRecord(nodeRecord); break; case OpCode.createSession: - CreateSessionTxn createSessionTxn = new CreateSessionTxn(); - request.readRequestRecord(createSessionTxn); + CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new); request.setTxn(createSessionTxn); // only add the global session tracker but not to ZKDb zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut()); @@ -630,9 +616,6 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, case OpCode.check: zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record; - if (deserialize) { - request.readRequestRecord(checkVersionRequest); - } path = checkVersionRequest.getPath(); validatePath(path, request.sessionId); nodeRecord = getRecordForPath(path); @@ -653,11 +636,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, } } - private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { - if (deserialize) { - request.readRequestRecord(record); - } - + private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException { int flags; String path; List acl; @@ -792,39 +771,41 @@ private void pRequestHelper(Request request) { case OpCode.createContainer: case OpCode.create: case OpCode.create2: - CreateRequest create2Request = new CreateRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true); + CreateRequest create2Request = request.readRequestRecord(CreateRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request); break; case OpCode.createTTL: - CreateTTLRequest createTtlRequest = new CreateTTLRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true); + CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest); break; case OpCode.deleteContainer: + DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest); + break; case OpCode.delete: - DeleteRequest deleteRequest = new DeleteRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true); + DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest); break; case OpCode.setData: - SetDataRequest setDataRequest = new SetDataRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true); + SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest); break; case OpCode.reconfig: - ReconfigRequest reconfigRequest = new ReconfigRequest(); - request.readRequestRecord(reconfigRequest); - pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); + ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest); break; case OpCode.setACL: - SetACLRequest setAclRequest = new SetACLRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true); + SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest); break; case OpCode.check: - CheckVersionRequest checkRequest = new CheckVersionRequest(); - pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true); + CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new); + pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest); break; case OpCode.multi: - MultiOperationRecord multiRequest = new MultiOperationRecord(); + MultiOperationRecord multiRequest; try { - request.readRequestRecord(multiRequest); + multiRequest = request.readRequestRecord(MultiOperationRecord::new); } catch (IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; @@ -854,7 +835,7 @@ private void pRequestHelper(Request request) { } else { /* Prep the request and convert to a Txn */ try { - pRequest2Txn(op.getType(), zxid, request, subrequest, false); + pRequest2Txn(op.getType(), zxid, request, subrequest); type = op.getType(); txn = request.getTxn(); } catch (KeeperException e) { @@ -899,7 +880,7 @@ private void pRequestHelper(Request request) { case OpCode.createSession: case OpCode.closeSession: if (!request.isLocalSession()) { - pRequest2Txn(request.type, zks.getNextZxid(), request, null, true); + pRequest2Txn(request.type, zks.getNextZxid(), request, null); } break; @@ -944,20 +925,14 @@ private void pRequestHelper(Request request) { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process {}", request, e); - StringBuilder sb = new StringBuilder(); - byte[] payload = request.readRequestBytes(); - if (payload != null) { - for (byte b : payload) { - sb.append(String.format("%02x", (0xff & b))); - } - } else { - sb.append("request buffer is null"); - } - LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); - if (request.getHdr() != null) { - request.getHdr().setType(OpCode.error); - request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); + String digest = request.requestDigest(); + LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest); + if (request.getHdr() == null) { + request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type)); } + + request.getHdr().setType(OpCode.error); + request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue())); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 1aee6aee24f..86a50fc5516 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.function.Supplier; import org.apache.jute.Record; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.OpCode; @@ -52,12 +53,12 @@ public class Request { // associated session timeout. Disabled by default. private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(System.getProperty("zookeeper.request_stale_latency_check", "false")); - public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List authInfo) { + public Request(ServerCnxn cnxn, long sessionId, int xid, int type, RequestRecord request, List authInfo) { this.cnxn = cnxn; this.sessionId = sessionId; this.cxid = xid; this.type = type; - this.request = bb; + this.request = request; this.authInfo = authInfo; } @@ -79,30 +80,42 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon public final int type; - private final ByteBuffer request; + private final RequestRecord request; - public void readRequestRecord(Record record) throws IOException { + public T readRequestRecord(Supplier constructor) throws IOException { if (request != null) { - request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request, record); - request.rewind(); - return; + return request.readRecord(constructor); } throw new IOException(new NullPointerException("request")); } + public T readRequestRecordNoException(Supplier constructor) { + try { + return readRequestRecord(constructor); + } catch (IOException e) { + return null; + } + } + public byte[] readRequestBytes() { if (request != null) { - request.rewind(); - int len = request.remaining(); - byte[] b = new byte[len]; - request.get(b); - request.rewind(); - return b; + return request.readBytes(); } return null; } + public String requestDigest() { + if (request != null) { + final StringBuilder sb = new StringBuilder(); + final byte[] payload = request.readBytes(); + for (byte b : payload) { + sb.append(String.format("%02x", (0xff & b))); + } + return sb.toString(); + } + return "request buffer is null"; + } + public final ServerCnxn cnxn; private TxnHeader hdr; @@ -423,18 +436,19 @@ public String toString() { && type != OpCode.setWatches && type != OpCode.setWatches2 && type != OpCode.closeSession - && request != null - && request.remaining() >= 4) { + && request != null) { try { // make sure we don't mess with request itself - ByteBuffer rbuf = request.asReadOnlyBuffer(); - rbuf.clear(); - int pathLen = rbuf.getInt(); - // sanity check - if (pathLen >= 0 && pathLen < 4096 && rbuf.remaining() >= pathLen) { - byte[] b = new byte[pathLen]; - rbuf.get(b); - path = new String(b, UTF_8); + byte[] bytes = request.readBytes(); + if (bytes != null && bytes.length >= 4) { + ByteBuffer buf = ByteBuffer.wrap(bytes); + int pathLen = buf.getInt(); + // sanity check + if (pathLen >= 0 && pathLen < 4096 && buf.remaining() >= pathLen) { + byte[] b = new byte[pathLen]; + buf.get(b); + path = new String(b, UTF_8); + } } } catch (Exception e) { // ignore - can't find the path, will output "n/a" instead diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java new file mode 100644 index 00000000000..6265f168e4e --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *uuuuu + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "/RequuuAS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.jute.Record; + +public interface RequestRecord { + + static RequestRecord fromBytes(ByteBuffer buffer) { + return new ByteBufferRequestRecord(buffer); + } + + static RequestRecord fromBytes(byte[] bytes) { + return fromBytes(ByteBuffer.wrap(bytes)); + } + + static RequestRecord fromRecord(Record record) { + return new SimpleRequestRecord(record); + } + + T readRecord(Supplier clazz) throws IOException; + + byte[] readBytes(); + + int limit(); + +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java index d60efa0878f..4a401e5b919 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestThrottler.java @@ -195,13 +195,11 @@ public void run() { LOG.info("RequestThrottler shutdown. Dropped {} requests", dropped); } - private synchronized void throttleSleep(int stallTime) { - try { - ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); - this.wait(stallTime); - } catch (InterruptedException ie) { - return; - } + + // @VisibleForTesting + synchronized void throttleSleep(int stallTime) throws InterruptedException { + ServerMetrics.getMetrics().REQUEST_THROTTLE_WAIT_COUNT.add(1); + this.wait(stallTime); } @SuppressFBWarnings(value = "NN_NAKED_NOTIFY", justification = "state change is in ZooKeeperServer.decInProgress() ") diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java new file mode 100644 index 00000000000..a1c78ddad6a --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + *uuuuu + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "/RequuuAS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.util.function.Supplier; +import org.apache.jute.BinaryOutputArchive; +import org.apache.jute.Record; + +public class SimpleRequestRecord implements RequestRecord { + + private final Record record; + + private volatile byte[] bytes; + + public SimpleRequestRecord(Record record) { + this.record = record; + } + + @SuppressWarnings("unchecked") + @Override + public T readRecord(Supplier constructor) { + return (T) record; + } + + @SuppressFBWarnings("EI_EXPOSE_REP") + @Override + public byte[] readBytes() { + if (bytes != null) { + return bytes; + } + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + record.serialize(boa, "request"); + bytes = baos.toByteArray(); + return bytes; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public int limit() { + byte[] bytes = readBytes(); + return ByteBuffer.wrap(bytes).limit(); + } +} diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 749b558329a..89e59f137f2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -22,7 +22,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.io.PrintWriter; import java.nio.ByteBuffer; import java.util.ArrayDeque; @@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import javax.security.sasl.SaslException; -import org.apache.jute.BinaryInputArchive; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.Environment; @@ -276,7 +274,7 @@ protected enum State { } // Connection throttling - private BlueThrottle connThrottle = new BlueThrottle(); + private final BlueThrottle connThrottle = new BlueThrottle(); @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "Internally the throttler has a BlockingQueue so " @@ -290,17 +288,17 @@ protected enum State { * too many large requests such that the JVM runs out of usable heap and * ultimately crashes. * - * The limit is enforced by the {@link checkRequestSize(int, boolean)} + * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)} * method which is called by the connection layer ({@link NIOServerCnxn}, * {@link NettyServerCnxn}) before allocating a byte buffer and pulling * data off the TCP socket. The limit is then checked again by the - * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which - * also atomically updates {@link currentLargeRequestBytes}. The request is + * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which + * also atomically updates {@link #currentLargeRequestBytes}. The request is * then marked as a large request, with the request size stored in the Request - * object so that it can later be decremented from {@link currentLargeRequestsBytes}. + * object so that it can later be decremented from {@link #currentLargeRequestBytes}. * * When a request is completed or dropped, the relevant code path calls the - * {@link requestFinished(Request)} method which performs the decrement if + * {@link #requestFinished(Request)} method which performs the decrement if * needed. */ private volatile int largeRequestMaxBytes = 100 * 1024 * 1024; @@ -313,7 +311,7 @@ protected enum State { private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0); - private AuthenticationHelper authHelper; + private final AuthenticationHelper authHelper = new AuthenticationHelper(); void removeCnxn(ServerCnxn cnxn) { zkDb.removeCnxn(cnxn); @@ -329,7 +327,6 @@ public ZooKeeperServer() { listener = new ZooKeeperServerListenerImpl(this); serverStats = new ServerStats(this); this.requestPathMetricsCollector = new RequestPathMetricsCollector(); - this.authHelper = new AuthenticationHelper(); } /** @@ -371,8 +368,6 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio this.initLargeRequestThrottlingSettings(); - this.authHelper = new AuthenticationHelper(); - LOG.info( "Created server with" + " tickTime {} ms" @@ -749,9 +744,12 @@ protected void startJvmPauseMonitor() { } protected void startRequestThrottler() { - requestThrottler = new RequestThrottler(this); + requestThrottler = createRequestThrottler(); requestThrottler.start(); + } + protected RequestThrottler createRequestThrottler() { + return new RequestThrottler(this); } protected void setupRequestProcessors() { @@ -1012,10 +1010,9 @@ long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) { long sessionId = sessionTracker.createSession(timeout); Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); - ByteBuffer to = ByteBuffer.allocate(4); - to.putInt(timeout); + CreateSessionTxn txn = new CreateSessionTxn(timeout); cnxn.setSessionId(sessionId); - Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null); + Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); submitRequest(si); return sessionId; } @@ -1592,13 +1589,7 @@ public void requestFinished(Request request) { } } - public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { - // We have the request, now process and setup for next - InputStream bais = new ByteBufferInputStream(incomingBuffer); - BinaryInputArchive bia = BinaryInputArchive.getArchive(bais); - RequestHeader h = new RequestHeader(); - h.deserialize(bia, "header"); - + public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException { // Need to increase the outstanding request count first, otherwise // there might be a race condition that it enabled recv after // processing request and then disabled when check throttling. @@ -1610,14 +1601,9 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE // in cnxn, since it will close the cnxn anyway. cnxn.incrOutstandingAndCheckThrottle(h); - // Through the magic of byte buffers, txn will not be - // pointing - // to the start of the txn - incomingBuffer = incomingBuffer.slice(); if (h.getType() == OpCode.auth) { LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress()); - AuthPacket authPacket = new AuthPacket(); - ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket); + AuthPacket authPacket = request.readRecord(AuthPacket::new); String scheme = authPacket.getScheme(); ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme); Code authReturn = KeeperException.Code.AUTHFAILED; @@ -1657,15 +1643,15 @@ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOE } return; } else if (h.getType() == OpCode.sasl) { - processSasl(incomingBuffer, cnxn, h); + processSasl(request, cnxn, h); } else { if (!authHelper.enforceAuthentication(cnxn, h.getXid())) { // Authentication enforcement is failed // Already sent response to user about failure and closed the session, lets return return; } else { - Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo()); - int length = incomingBuffer.limit(); + Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo()); + int length = request.limit(); if (isLargeRequest(length)) { // checkRequestSize will throw IOException if request is rejected checkRequestSizeWhenMessageReceived(length); @@ -1703,10 +1689,9 @@ private static boolean shouldAllowSaslFailedClientsConnect() { return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS); } - private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { + private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException { LOG.debug("Responding to client SASL token."); - GetSASLRequest clientTokenRecord = new GetSASLRequest(); - ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord); + GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new); byte[] clientToken = clientTokenRecord.getToken(); LOG.debug("Size of client SASL token: {}", clientToken.length); byte[] responseToken = null; @@ -2177,8 +2162,8 @@ private String effectiveACLPath(Request request) throws KeeperException.BadArgum switch (request.type) { case OpCode.create: case OpCode.create2: { - CreateRequest req = new CreateRequest(); - if (readRequestRecord(request, req)) { + CreateRequest req = request.readRequestRecordNoException(CreateRequest::new); + if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = parentPath(req.getPath()); @@ -2186,22 +2171,22 @@ private String effectiveACLPath(Request request) throws KeeperException.BadArgum break; } case OpCode.delete: { - DeleteRequest req = new DeleteRequest(); - if (readRequestRecord(request, req)) { + DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new); + if (req != null) { path = parentPath(req.getPath()); } break; } case OpCode.setData: { - SetDataRequest req = new SetDataRequest(); - if (readRequestRecord(request, req)) { + SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new); + if (req != null) { path = req.getPath(); } break; } case OpCode.setACL: { - SetACLRequest req = new SetACLRequest(); - if (readRequestRecord(request, req)) { + SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new); + if (req != null) { mustCheckACL = true; acl = req.getAcl(); path = req.getPath(); @@ -2295,15 +2280,6 @@ public boolean authWriteRequest(Request request) { return err == KeeperException.Code.OK.intValue(); } - private boolean readRequestRecord(Request request, Record record) { - try { - request.readRequestRecord(record); - return true; - } catch (IOException ex) { - return false; - } - } - public int getOutstandingHandshakeNum() { if (serverCnxnFactory instanceof NettyServerCnxnFactory) { return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java index 7bd30baa6e9..d6cbbe4a1ee 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMain.java @@ -244,6 +244,23 @@ ServerCnxnFactory getSecureCnxnFactory() { return secureCnxnFactory; } + // VisibleForTesting + public int getClientPort() { + if (cnxnFactory != null) { + return cnxnFactory.getLocalPort(); + } + return 0; + } + + // VisibleForTesting + public int getSecureClientPort() { + if (secureCnxnFactory != null) { + return secureCnxnFactory.getLocalPort(); + } + return 0; + } + + /** * Shutdowns properly the service, this method is not a public API. */ diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java index ec6ae637a0c..ad41a3228e7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/embedded/ZooKeeperServerEmbeddedImpl.java @@ -1,6 +1,7 @@ package org.apache.zookeeper.server.embedded; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; import java.util.Map; @@ -45,6 +46,9 @@ class ZooKeeperServerEmbeddedImpl implements ZooKeeperServerEmbedded { private final ExitHandler exitHandler; private volatile boolean stopping; + private int boundClientPort; + private int boundSecureClientPort; + ZooKeeperServerEmbeddedImpl(Properties p, Path baseDir, ExitHandler exitHandler) throws Exception { if (!p.containsKey("dataDir")) { p.put("dataDir", baseDir.resolve("data").toAbsolutePath().toString()); @@ -103,6 +107,8 @@ protected QuorumPeer getQuorumPeer() throws SaslException { @Override public void start() { super.start(); + boundClientPort = getClientPort(); + boundSecureClientPort = getSecureClientPort(); LOG.info("ZK Server {} started", this); started.complete(null); } @@ -142,6 +148,8 @@ public void run() { @Override public void serverStarted() { LOG.info("ZK Server started"); + boundClientPort = getClientPort(); + boundSecureClientPort = getSecureClientPort(); started.complete(null); } }; @@ -184,22 +192,22 @@ public void run() { @Override public String getConnectionString() { - if (config.getClientPortAddress() != null) { - String raw = config.getClientPortAddress().getHostString() + ":" + config.getClientPortAddress().getPort(); - return raw.replace("0.0.0.0", "localhost"); - } else { - throw new IllegalStateException("No client address is configured"); - } + return prettifyConnectionString(config.getClientPortAddress(), boundClientPort); } @Override public String getSecureConnectionString() { - if (config.getSecureClientPortAddress() != null) { - String raw = config.getSecureClientPortAddress().getHostString() + ":" + config.getSecureClientPortAddress().getPort(); - return raw.replace("0.0.0.0", "localhost"); - } else { - throw new IllegalStateException("No client address is configured"); + return prettifyConnectionString(config.getSecureClientPortAddress(), boundSecureClientPort); + } + + private String prettifyConnectionString(InetSocketAddress confAddress, int boundPort) { + if (confAddress != null) { + return confAddress.getHostString() + .replace("0.0.0.0", "localhost") + .replace("0:0:0:0:0:0:0:0", "localhost") + + ":" + boundPort; } + throw new IllegalStateException("No client address is configured"); } @Override diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index b9e6c014c1f..5f57813dc97 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -659,14 +659,12 @@ public void applyDelayedPackets() { continue; } delayedCommits.remove(); - Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null); - request.setTxn(p.rec); - request.setHdr(p.hdr); + Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1); request.setTxnDigest(p.digest); ozk.commitRequest(request); } } else { - // New server type need to handle in-flight packets + // New server type needs to handle in-flight packets throw new UnsupportedOperationException("Unknown server type"); } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java index 259d9fff915..eea11d33a2c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java @@ -43,6 +43,7 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.common.Time; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.TxnLogProposalIterator; import org.apache.zookeeper.server.ZKDatabase; @@ -702,9 +703,9 @@ public void run() { bb = bb.slice(); Request si; if (type == OpCode.sync) { - si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()); + si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); } else { - si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo()); + si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo()); } si.setOwner(this); learnerMaster.submitLearnerRequest(si); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java index d4c83aeab7b..6892d3dd8a7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java @@ -18,17 +18,17 @@ package org.apache.zookeeper.server.quorum; -import java.nio.ByteBuffer; import java.util.List; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; public class LearnerSyncRequest extends Request { LearnerHandler fh; public LearnerSyncRequest( - LearnerHandler fh, long sessionId, int xid, int type, ByteBuffer bb, List authInfo) { - super(null, sessionId, xid, type, bb, authInfo); + LearnerHandler fh, long sessionId, int xid, int type, RequestRecord request, List authInfo) { + super(null, sessionId, xid, type, request, authInfo); this.fh = fh; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index 3f448148a83..b90fa84b8c7 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -2139,6 +2139,13 @@ public int getClientPort() { return -1; } + public int getSecureClientPort() { + if (secureCnxnFactory != null) { + return secureCnxnFactory.getLocalPort(); + } + return -1; + } + public void setTxnFactory(FileTxnSnapLog factory) { this.logFactory = factory; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index 1b37f291f51..d478c409c8f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -271,8 +271,8 @@ public static String getVersionFromFilename(String filename) { * @throws ConfigException */ public void parseProperties(Properties zkProp) throws IOException, ConfigException { - int clientPort = 0; - int secureClientPort = 0; + Integer clientPort = null; + Integer secureClientPort = null; int observerMasterPort = 0; String clientPortAddress = null; String secureClientPortAddress = null; @@ -427,7 +427,7 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti dataLogDir = dataDir; } - if (clientPort == 0) { + if (clientPort == null) { LOG.info("clientPort is not set"); if (clientPortAddress != null) { throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set"); @@ -440,7 +440,7 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti LOG.info("clientPortAddress is {}", formatInetAddr(this.clientPortAddress)); } - if (secureClientPort == 0) { + if (secureClientPort == null) { LOG.info("secureClientPort is not set"); if (secureClientPortAddress != null) { throw new IllegalArgumentException("secureClientPortAddress is set but secureClientPort is not set"); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 2f24347b778..240936956fc 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.io.PrintWriter; -import java.nio.ByteBuffer; import java.util.Objects; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -32,10 +31,12 @@ import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.txn.CreateSessionTxn; /** * Abstract base class for all ZooKeeperServers that participate in @@ -76,8 +77,7 @@ public Request checkUpgradeSession(Request request) throws IOException, KeeperEx } if (OpCode.multi == request.type) { - MultiOperationRecord multiTransactionRecord = new MultiOperationRecord(); - request.readRequestRecord(multiTransactionRecord); + MultiOperationRecord multiTransactionRecord = request.readRequestRecord(MultiOperationRecord::new); boolean containsEphemeralCreate = false; for (Op op : multiTransactionRecord) { if (op.getType() == OpCode.create || op.getType() == OpCode.create2) { @@ -93,8 +93,7 @@ public Request checkUpgradeSession(Request request) throws IOException, KeeperEx return null; } } else { - CreateRequest createRequest = new CreateRequest(); - request.readRequestRecord(createRequest); + CreateRequest createRequest = request.readRequestRecord(CreateRequest::new); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (!createMode.isEphemeral()) { return null; @@ -116,9 +115,8 @@ private Request makeUpgradeRequest(long sessionId) { synchronized (upgradeableSessionTracker) { if (upgradeableSessionTracker.isLocalSession(sessionId)) { int timeout = upgradeableSessionTracker.upgradeSession(sessionId); - ByteBuffer to = ByteBuffer.allocate(4); - to.putInt(timeout); - return new Request(null, sessionId, 0, OpCode.createSession, to, null); + CreateSessionTxn txn = new CreateSessionTxn(timeout); + return new Request(null, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null); } } return null; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java index df3b831a30d..8a700bbdf4f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java @@ -39,9 +39,9 @@ import org.apache.zookeeper.audit.AuditEvent.Result; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.quorum.QuorumPeerTestBase; +import org.apache.zookeeper.server.util.AuthUtil; import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase.CountdownWatcher; import org.apache.zookeeper.test.LoggerTestTool; @@ -290,9 +290,7 @@ private String getSession() { private String getUser() { ServerCnxn next = getServerCnxn(); - Request request = new Request(next, -1, -1, -1, null, - next.getAuthInfo()); - return request.getUsersForAudit(); + return AuthUtil.getUsers(next.getAuthInfo()); } private String getIp() { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java index f9fd6d8b588..0e6c1b58f92 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java @@ -19,10 +19,12 @@ package org.apache.zookeeper.server; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -36,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.DeleteContainerRequest; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; import org.apache.zookeeper.ZooDefs; @@ -91,7 +94,7 @@ public void testCreateWithStat() throws KeeperException, InterruptedException { Stat stat = createWithStatVerifyResult("/foo"); Stat childStat = createWithStatVerifyResult("/foo/child"); // Don't expect to get the same stats for different creates. - assertFalse(stat.equals(childStat)); + assertNotEquals(stat, childStat); } @SuppressWarnings("ConstantConditions") @@ -224,7 +227,11 @@ public void testMaxPerMinute() throws InterruptedException { RequestProcessor processor = new RequestProcessor() { @Override public void processRequest(Request request) { - queue.add(new String(request.readRequestBytes())); + try { + queue.add(request.readRequestRecord(DeleteContainerRequest::new).getPath()); + } catch (IOException e) { + fail(e); + } } @Override @@ -246,14 +253,13 @@ protected Collection getCandidates() { containerManager.checkContainers(); return null; }); - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one"); - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two"); - assertEquals(queue.size(), 0); + assertEquals("/one", queue.poll(5, TimeUnit.SECONDS)); + assertEquals("/two", queue.poll(5, TimeUnit.SECONDS)); + assertEquals(0, queue.size()); Thread.sleep(500); - assertEquals(queue.size(), 0); - - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/three"); - assertEquals(queue.poll(5, TimeUnit.SECONDS), "/four"); + assertEquals(0, queue.size()); + assertEquals("/three", queue.poll(5, TimeUnit.SECONDS)); + assertEquals("/four", queue.poll(5, TimeUnit.SECONDS)); } @Test diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java index 0181d2ebeae..4b47d6cbb3c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java @@ -96,7 +96,7 @@ public void testACLDigestHashHiding_NoAuth_WorldCanRead() { // Arrange // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList()); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList()); processor.processRequest(r); // Assert @@ -109,7 +109,7 @@ public void testACLDigestHashHiding_NoAuth_NoWorld() { testACLs.remove(2); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList()); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList()); processor.processRequest(r); // Assert @@ -123,7 +123,7 @@ public void testACLDigestHashHiding_UserCanRead() { authInfo.add(new Id("digest", "otheruser:somesecrethash")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert @@ -137,7 +137,7 @@ public void testACLDigestHashHiding_UserCanAll() { authInfo.add(new Id("digest", "user:secrethash")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert @@ -151,7 +151,7 @@ public void testACLDigestHashHiding_AdminUser() { authInfo.add(new Id("digest", "adminuser:adminsecret")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert @@ -167,7 +167,7 @@ public void testACLDigestHashHiding_OnlyAdmin() { authInfo.add(new Id("digest", "adminuser:adminsecret")); // Act - Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo); + Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo); processor.processRequest(r); // Assert diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java index 001e2363548..6826910103c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java @@ -117,7 +117,7 @@ private Request makeGetDataRequest(String path, long sessionId) throws IOExcepti GetDataRequest getDataRequest = new GetDataRequest(path, false); getDataRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList()); } private Request makeCreateRequest(String path, long sessionId) throws IOException { @@ -126,7 +126,7 @@ private Request makeCreateRequest(String path, long sessionId) throws IOExceptio CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList()); } private QuorumZooKeeperServer getConnectedServer(long sessionId) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java index 062cae1f9db..bfb0db5f3fb 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java @@ -29,15 +29,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; @@ -94,12 +91,7 @@ public void tearDown() throws Exception { } private Request createRequest(Record record, int opCode) throws IOException { - // encoding - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - record.serialize(boa, "request"); - baos.close(); - return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), null); + return new Request(null, 1L, 0, opCode, RequestRecord.fromRecord(record), null); } private Request createRequest(String path, int opCode) throws IOException { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java index 9e71205697d..d80c5e08ed2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java @@ -24,11 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.IOException; import java.io.PrintWriter; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -37,7 +34,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; @@ -66,13 +62,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class PrepRequestProcessorTest extends ClientBase { - private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorTest.class); private static final int CONNECTION_TIMEOUT = 3000; private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique(); private CountDownLatch pLatch; @@ -120,34 +113,28 @@ public void teardown() throws Exception { public void testPRequest() throws Exception { pLatch = new CountDownLatch(1); processor = new PrepRequestProcessor(zks, new MyRequestProcessor()); - Request foo = new Request(null, 1L, 1, OpCode.create, ByteBuffer.allocate(3), null); + Request foo = new Request(null, 1L, 1, OpCode.create, RequestRecord.fromBytes(new byte[3]), null); processor.pRequest(foo); assertEquals(new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.getTxn(), "Request should have marshalling error"); assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain"); } - private Request createRequest(Record record, int opCode) throws IOException { + private Request createRequest(Record record, int opCode) { return createRequest(record, opCode, 1L); } - private Request createRequest(Record record, int opCode, long sessionId) throws IOException { + private Request createRequest(Record record, int opCode, long sessionId) { return createRequest(record, opCode, sessionId, false); } - private Request createRequest(Record record, int opCode, boolean admin) throws IOException { + private Request createRequest(Record record, int opCode, boolean admin) { return createRequest(record, opCode, 1L, admin); } - private Request createRequest(Record record, int opCode, long sessionId, boolean admin) throws IOException { - // encoding - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - record.serialize(boa, "request"); - baos.close(); - // Id - List ids = Arrays.asList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE); - return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids); + private Request createRequest(Record record, int opCode, long sessionId, boolean admin) { + List ids = Collections.singletonList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE); + return new Request(null, sessionId, 0, opCode, RequestRecord.fromRecord(record), ids); } private void process(List ops) throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java index ed22399902f..15259207599 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java @@ -67,11 +67,17 @@ public class RequestThrottlerTest extends ZKTestCase { CountDownLatch disconnected = null; + CountDownLatch throttled = null; + CountDownLatch throttling = null; + ZooKeeperServer zks = null; ServerCnxnFactory f = null; ZooKeeper zk = null; int connectionLossCount = 0; + private long getCounterMetric(String name) { + return (long) MetricsUtils.currentServerMetrics().get(name); + } @BeforeEach public void setup() throws Exception { @@ -115,6 +121,11 @@ public TestZooKeeperServer(File snapDir, File logDir, int tickTime) throws IOExc super(snapDir, logDir, tickTime); } + @Override + protected RequestThrottler createRequestThrottler() { + return new TestRequestThrottler(this); + } + @Override protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); @@ -141,6 +152,24 @@ public void requestFinished(Request request) { } } + class TestRequestThrottler extends RequestThrottler { + public TestRequestThrottler(ZooKeeperServer zks) { + super(zks); + } + + @Override + synchronized void throttleSleep(int stallTime) throws InterruptedException { + if (throttling != null) { + throttling.countDown(); + } + super.throttleSleep(stallTime); + // Defend against unstable timing and potential spurious wakeup. + if (throttled != null) { + assertTrue(throttled.await(20, TimeUnit.SECONDS)); + } + } + } + class TestPrepRequestProcessor extends PrepRequestProcessor { public TestPrepRequestProcessor(ZooKeeperServer zks, RequestProcessor syncProcessor) { @@ -191,18 +220,22 @@ public void testRequestThrottler() throws Exception { // make sure the server received all 5 requests submitted.await(5, TimeUnit.SECONDS); - Map metrics = MetricsUtils.currentServerMetrics(); // but only two requests can get into the pipeline because of the throttler - assertEquals(2L, (long) metrics.get("prep_processor_request_queued")); - assertEquals(1L, (long) metrics.get("request_throttle_wait_count")); + WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2; + waitFor("request not queued", requestQueued, 5); + + WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1; + waitFor("no throttle wait", throttleWait, 5); // let the requests go through the pipeline and the throttler will be waken up to allow more requests // to enter the pipeline resumeProcess.countDown(); - entered.await(STALL_TIME, TimeUnit.MILLISECONDS); - metrics = MetricsUtils.currentServerMetrics(); + // wait for more than one STALL_TIME to reduce timeout before wakeup + assertTrue(entered.await(STALL_TIME + 5000, TimeUnit.MILLISECONDS)); + + Map metrics = MetricsUtils.currentServerMetrics(); assertEquals(TOTAL_REQUESTS, (long) metrics.get("prep_processor_request_queued")); } @@ -221,6 +254,9 @@ public void testDropStaleRequests() throws Exception { resumeProcess = new CountDownLatch(1); submitted = new CountDownLatch(TOTAL_REQUESTS); + throttled = new CountDownLatch(1); + throttling = new CountDownLatch(1); + // send 5 requests asynchronously for (int i = 0; i < TOTAL_REQUESTS; i++) { zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " @@ -231,11 +267,18 @@ public void testDropStaleRequests() throws Exception { // make sure the server received all 5 requests assertTrue(submitted.await(5, TimeUnit.SECONDS)); + // stale throttled requests + assertTrue(throttling.await(5, TimeUnit.SECONDS)); for (ServerCnxn cnxn : f.cnxns) { cnxn.setStale(); } + throttled.countDown(); zk = null; + // only first three requests are counted as finished + finished = new CountDownLatch(3); + + // let the requests go through the pipeline resumeProcess.countDown(); LOG.info("raise the latch"); @@ -243,6 +286,8 @@ public void testDropStaleRequests() throws Exception { Thread.sleep(50); } + assertTrue(finished.await(5, TimeUnit.SECONDS)); + // assert after all requests processed to avoid concurrent issues as metrics are // counted in different threads. Map metrics = MetricsUtils.currentServerMetrics(); @@ -327,7 +372,6 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() RequestThrottler.setMaxRequests(0); resumeProcess = new CountDownLatch(1); int totalRequests = 10; - submitted = new CountDownLatch(totalRequests); for (int i = 0; i < totalRequests; i++) { zk.create("/request_throttle_test- " + i, ("/request_throttle_test- " @@ -335,16 +379,16 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() }, null); } - submitted.await(5, TimeUnit.SECONDS); - // We should start throttling instead of queuing more requests. // // We always allow up to GLOBAL_OUTSTANDING_LIMIT + 1 number of requests coming in request processing pipeline // before throttling. For the next request, we will throttle by disabling receiving future requests but we still - // allow this single request coming in. So the total number of queued requests in processing pipeline would + // allow this single request coming in. Ideally, the total number of queued requests in processing pipeline would // be GLOBAL_OUTSTANDING_LIMIT + 2. - assertEquals(Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2, - (long) MetricsUtils.currentServerMetrics().get("prep_processor_request_queued")); + // + // But due to leak of consistent view of number of outstanding requests, the number could be larger. + WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2; + waitFor("no enough requests queued", requestQueued, 5); resumeProcess.countDown(); } catch (Exception e) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java index 16c7a7dd1c8..681e8348b07 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java @@ -19,7 +19,6 @@ package org.apache.zookeeper.server; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.ZKTestCase; @@ -67,7 +66,7 @@ public void testUnrecoverableErrorCountFromRequestProcessor() throws Exception { PrepRequestProcessor processor = new MyPrepRequestProcessor(); processor.start(); - processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null)); + processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null)); processed.await(); processor.shutdown(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java index d9868b235df..00bfced0cd4 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/embedded/ZookeeperServerEmbeddedTest.java @@ -17,12 +17,17 @@ */ package org.apache.zookeeper.server.embedded; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.endsWith; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.nio.file.Path; import java.util.Properties; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.test.ClientBase; +import org.junit.function.ThrowingRunnable; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -95,4 +100,31 @@ public void testStart() throws Exception { } + @Test + public void testBindPortZero() throws Exception { + final Properties configZookeeper = new Properties(); + final ZooKeeperServerEmbedded.ZookKeeperServerEmbeddedBuilder builder = ZooKeeperServerEmbedded.builder() + .baseDir(baseDir) + .configuration(configZookeeper) + .exitHandler(ExitHandler.LOG_ONLY); + + // Unconfigured client port will still fail + try (ZooKeeperServerEmbedded zkServer = builder.build()) { + zkServer.start(); + assertThrows(IllegalStateException.class, new ThrowingRunnable() { + @Override + public void run() throws Throwable { + zkServer.getConnectionString(); + } + }); + } + + // Explicit port zero should work + configZookeeper.put("clientPort", "0"); + try (ZooKeeperServerEmbedded zkServer = builder.build()) { + zkServer.start(); + assertThat(zkServer.getConnectionString(), not(endsWith(":0"))); + assertTrue(ClientBase.waitForServerUp(zkServer.getConnectionString(), 60000)); + } + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java index 1050a474bc0..e5f668d1f3e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java @@ -43,6 +43,7 @@ import org.apache.zookeeper.proto.SetDataRequest; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.WorkerService; import org.apache.zookeeper.server.ZooKeeperServerListener; import org.junit.jupiter.api.AfterEach; @@ -129,7 +130,7 @@ private Request newRequest(Record rec, int type, int sessionId, int xid) throws BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas); rec.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, xid, type, bb, new ArrayList()); + return new Request(null, sessionId, xid, type, RequestRecord.fromBytes(bb), new ArrayList()); } /** diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java index 4b67f5b50bb..4a45983555a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -31,6 +30,7 @@ import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.WorkerService; import org.junit.jupiter.api.AfterEach; @@ -192,11 +192,11 @@ private void checkTimeMetric(long actual, long lBoundrary, long hBoundrary) { } private Request createReadRequest(long sessionId, int xid) { - return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, ByteBuffer.wrap(new byte[10]), null); + return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, RequestRecord.fromBytes(new byte[10]), null); } private Request createWriteRequest(long sessionId, int xid) { - return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null); + return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null); } private void processRequestWithWait(Request request) throws Exception { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java index 46958d1949c..46a4c387479 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java @@ -42,6 +42,7 @@ import org.apache.zookeeper.server.PrepRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.test.ClientBase; import org.junit.jupiter.api.AfterEach; @@ -160,7 +161,7 @@ public void sendWriteRequest() throws Exception { + (++nodeId), new byte[0], Ids.OPEN_ACL_UNSAFE, 1); createReq.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - Request req = new Request(null, sessionId, ++cxid, OpCode.create, bb, new ArrayList()); + Request req = new Request(null, sessionId, ++cxid, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList()); zks.getFirstProcessor().processRequest(req); } @@ -174,7 +175,7 @@ public void sendReadRequest() throws Exception { + nodeId, false); getDataRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - Request req = new Request(null, sessionId, ++cxid, OpCode.getData, bb, new ArrayList()); + Request req = new Request(null, sessionId, ++cxid, OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList()); zks.getFirstProcessor().processRequest(req); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java index ef1f1219452..5216eb70324 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java @@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.net.SocketException; -import java.nio.ByteBuffer; import javax.security.sasl.SaslException; import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZooDefs; @@ -32,11 +31,13 @@ import org.apache.zookeeper.server.PrepRequestProcessor; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState; import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.txn.DeleteTxn; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -220,7 +221,8 @@ public void shutdown() { * Add a request so that something is there for SyncRequestProcessor * to process, while we are in shutdown flow */ - Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, ByteBuffer.wrap("/deadLockIssue".getBytes()), null); + DeleteTxn deleteTxn = new DeleteTxn("/deadLockIssue"); + Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, RequestRecord.fromRecord(deleteTxn), null); processRequest(request); super.shutdown(); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java index 8803a73cbab..3d60c8b9174 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java @@ -40,6 +40,7 @@ import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.test.ClientBase; import org.junit.jupiter.api.AfterEach; @@ -317,8 +318,7 @@ protected void request(Request request) throws IOException { } if (request.type == ZooDefs.OpCode.create && request.cnxn != null) { - CreateRequest createRequest = new CreateRequest(); - request.readRequestRecord(createRequest); + CreateRequest createRequest = request.readRequestRecord(CreateRequest::new); try { CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isEphemeral()) { @@ -355,7 +355,7 @@ private Request createEphemeralRequest(String path, long sessionId) throws IOExc CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag()); createRequest.serialize(boa, "request"); ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray()); - return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList()); + return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList()); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java index 72bceafa371..17ccdae6ea6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java @@ -26,7 +26,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -34,6 +33,7 @@ import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.SyncRequestProcessor; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; @@ -65,7 +65,7 @@ public void setup() throws Exception { } private Request createRquest(long sessionId, int xid) { - return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null); + return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null); } @Test diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java index 99cd171c01f..a619866d91c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java @@ -33,6 +33,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.proto.CreateRequest; import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.quorum.QuorumPeer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -100,7 +101,7 @@ public void testCreateEphemeral(boolean localSessionEnabled) throws Exception { LOG.info("Fake session Id: {}", Long.toHexString(fakeSessionId)); - Request request = new Request(null, fakeSessionId, 0, OpCode.create, bb, new ArrayList()); + Request request = new Request(null, fakeSessionId, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList()); // Submit request directly to leader leader.getActiveServer().submitRequest(request); @@ -138,7 +139,7 @@ public void testCreatePersistent() throws Exception { LOG.info("Local session Id: {}", Long.toHexString(locallSession)); - Request request = new Request(null, locallSession, 0, OpCode.create, bb, new ArrayList()); + Request request = new Request(null, locallSession, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList()); // Submit request directly to leader leader.getActiveServer().submitRequest(request);