Skip to content

Commit

Permalink
fix: release RocksDB objects using try-with-resource
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <lizhanhui@gmail.com>
  • Loading branch information
lizhanhui committed Oct 23, 2024
1 parent 6130f00 commit b128882
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ protected void removeConsumerOffset(String topicAtGroup) {
endKey.writeBytes(topicBytes);
endKey.writeByte(AbstractRocksDBStorage.CTRL_2);

try {
WriteBatch writeBatch = new WriteBatch();
try (WriteBatch writeBatch = new WriteBatch()) {
// TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here
writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey));
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
Expand Down Expand Up @@ -134,8 +133,7 @@ public void removeOffset(String group) {
endKey.writeShort(groupBytes.length);
endKey.writeBytes(groupBytes);
endKey.writeByte(AbstractRocksDBStorage.CTRL_2);
try {
WriteBatch writeBatch = new WriteBatch();
try (WriteBatch writeBatch = new WriteBatch()) {
// TODO: we have to make a copy here as WriteBatch lacks ByteBuffer API here
writeBatch.deleteRange(ConfigHelper.readBytes(beginKey), ConfigHelper.readBytes(endKey));
MessageStore messageStore = brokerController.getMessageStore();
Expand Down Expand Up @@ -191,9 +189,8 @@ public void commitOffset(String clientHost, String group, String topic, int queu

ByteBuf keyBuf = keyOfConsumerOffset(group, topic, queueId);
ByteBuf valueBuf = ConfigStorage.POOLED_ALLOCATOR.buffer(Long.BYTES);
try {
try (WriteBatch writeBatch = new WriteBatch()) {
valueBuf.writeLong(offset);
WriteBatch writeBatch = new WriteBatch();
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
MessageStore messageStore = brokerController.getMessageStore();
long stateMachineVersion = messageStore != null ? messageStore.getStateMachineVersion() : 0;
Expand Down Expand Up @@ -393,8 +390,7 @@ public void commitPullOffset(String clientHost, String group, String topic, int

ByteBuf keyBuf = keyOfPullOffset(group, topic, queueId);
ByteBuf valueBuf = AbstractRocksDBStorage.POOLED_ALLOCATOR.buffer(8);
try {
WriteBatch writeBatch = new WriteBatch();
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config)
}
ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, config.getGroupName());
ByteBuf valueBuf = ConfigHelper.valueBufOf(config, SerializationType.JSON);
try {
WriteBatch writeBatch = new WriteBatch();
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
Expand All @@ -160,8 +159,7 @@ public boolean containsSubscriptionGroup(String group) {
@Override
protected SubscriptionGroupConfig removeSubscriptionGroupConfig(String groupName) {
ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.SUBSCRIPTION_GROUP, groupName);
try {
WriteBatch writeBatch = new WriteBatch();
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.delete(ConfigHelper.readBytes(keyBuf));
long stateMachineVersion = brokerController.getMessageStore().getStateMachineVersion();
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private boolean loadTopicConfig() {
* Key layout: [table-prefix, 1 byte][table-id, 2 bytes][record-type-prefix, 1 byte][topic-len, 2 bytes][topic-bytes]
* Value layout: [serialization-type, 1 byte][topic-config-bytes]
*
* @param key Topic config key representation in RocksDB
* @param key Topic config key representation in RocksDB
* @param value Topic config value representation in RocksDB
* @return decoded topic config
*/
Expand Down Expand Up @@ -146,10 +146,9 @@ public void updateTopicConfig(final TopicConfig topicConfig) {
}
super.updateSingleTopicConfigWithoutPersist(topicConfig);

WriteBatch writeBatch = new WriteBatch();
ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicConfig.getTopicName());
ByteBuf valueBuf = ConfigHelper.valueBufOf(topicConfig, SerializationType.JSON);
try {
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.put(keyBuf.nioBuffer(), valueBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
Expand All @@ -165,8 +164,7 @@ public void updateTopicConfig(final TopicConfig topicConfig) {
@Override
protected TopicConfig removeTopicConfig(String topicName) {
ByteBuf keyBuf = ConfigHelper.keyBufOf(TableId.TOPIC, topicName);
try {
WriteBatch writeBatch = new WriteBatch();
try (WriteBatch writeBatch = new WriteBatch()) {
writeBatch.delete(keyBuf.nioBuffer());
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
Expand Down

0 comments on commit b128882

Please sign in to comment.