diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java index 609f8a09bea..5b0885c491a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java @@ -93,8 +93,7 @@ protected void removeConsumerOffset(String topicAtGroup) { endKey.writeBytes(topicBytes); endKey.writeByte(AbstractRocksDBStorage.CTRL_2); - try { - WriteBatch writeBatch = new WriteBatch(); + try (WriteBatch writeBatch = new WriteBatch()) { // TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey)); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; @@ -134,8 +133,7 @@ public void removeOffset(String group) { endKey.writeShort(groupBytes.length); endKey.writeBytes(groupBytes); endKey.writeByte(AbstractRocksDBStorage.CTRL_2); - try { - WriteBatch writeBatch = new WriteBatch(); + try (WriteBatch writeBatch = new WriteBatch()) { // TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey)); MessageStore messageStore = brokerController.getMessageStore(); @@ -191,9 +189,8 @@ public void commitOffset(String clientHost, String group, String topic, int queu ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId); ByteBuf valueBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(Long.BYTES); - try { + try (WriteBatch writeBatch = new WriteBatch()) { valueBuf.writeLong(offset); - WriteBatch writeBatch = new WriteBatch(); writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); MessageStore messageStore = brokerController.getMessageStore(); long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0; @@ -393,8 +390,7 @@ public void commitPullOffset(String clientHost, String group, String topic, int ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId); ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8); - try { - WriteBatch writeBatch = new WriteBatch(); + try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java index abd2a87daac..8da6f9d2bc5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/SubscriptionGroupManagerV2.java @@ -133,8 +133,7 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) } ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, config.getGroupName()); ByteBuf valueBuf = ConfigHelper.valueBufOf(config, SerializationType.JSON); - try { - WriteBatch writeBatch = new WriteBatch(); + try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); @@ -160,8 +159,7 @@ public boolean containsSubscriptionGroup(String group) { @Override protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) { ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, groupName); - try { - WriteBatch writeBatch = new WriteBatch(); + try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.delete(ConfigHelper.readBytes(keyBuf)); long stateMachineVersion = brokerController.getMessageStore().getStateMachineVersion(); ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java index 5b3b36abcc4..b1a3d2d85ce 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/TopicConfigManagerV2.java @@ -91,7 +91,7 @@ private boolean loadTopicConfig() { * Key layout: [table-prefix, 1 byte][table-id, 2 bytes][record-type-prefix, 1 byte][topic-len, 2 bytes][topic-bytes] * Value layout: [serialization-type, 1 byte][topic-config-bytes] * - * @param key Topic config key representation in RocksDB + * @param key Topic config key representation in RocksDB * @param value Topic config value representation in RocksDB * @return decoded topic config */ @@ -146,10 +146,9 @@ public void updateTopicConfig(final TopicConfig topicConfig) { } super.updateSingleTopicConfigWithoutPersist(topicConfig); - WriteBatch writeBatch = new WriteBatch(); ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicConfig.getTopicName()); ByteBuf valueBuf = ConfigHelper.valueBufOf(topicConfig, SerializationType.JSON); - try { + try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion); @@ -165,8 +164,7 @@ public void updateTopicConfig(final TopicConfig topicConfig) { @Override protected TopicConfig removeTopicConfig(String topicName) { ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicName); - try { - WriteBatch writeBatch = new WriteBatch(); + try (WriteBatch writeBatch = new WriteBatch()) { writeBatch.delete(keyBuf.nioBuffer()); long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0; ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);