Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: revamp substreams serde #528

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import apache.rocketmq.controller.v1.S3WALObject;
import apache.rocketmq.controller.v1.StreamState;
import apache.rocketmq.controller.v1.SubStream;
import apache.rocketmq.controller.v1.SubStreams;
import apache.rocketmq.controller.v1.TrimStreamRequest;
import com.automq.rocketmq.common.system.S3Constants;
import com.automq.rocketmq.common.system.StreamConstants;
Expand All @@ -42,13 +43,9 @@
import com.automq.rocketmq.controller.metadata.database.mapper.S3WalObjectMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.SequenceMapper;
import com.automq.rocketmq.controller.metadata.database.mapper.StreamMapper;
import com.automq.rocketmq.controller.metadata.database.serde.SubStreamDeserializer;
import com.automq.rocketmq.controller.metadata.database.serde.SubStreamSerializer;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import java.nio.charset.StandardCharsets;
import com.google.protobuf.util.JsonFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
Expand All @@ -72,14 +69,8 @@ public class S3MetadataManager {

private final MetadataStore metadataStore;

private final Gson gson;

public S3MetadataManager(MetadataStore metadataStore) {
this.metadataStore = metadataStore;
this.gson = new GsonBuilder()
.registerTypeAdapter(SubStream.class, new SubStreamSerializer())
.registerTypeAdapter(SubStream.class, new SubStreamDeserializer())
.create();
}

public CompletableFuture<Long> prepareS3Objects(int count, int ttlInMinutes) {
Expand Down Expand Up @@ -161,7 +152,7 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
List<long[]> offsets = java.util.stream.Stream.concat(
streamObjects.stream()
.map(s3StreamObject -> new long[] {s3StreamObject.getStreamId(), s3StreamObject.getStartOffset(), s3StreamObject.getEndOffset()}),
walObject.getSubStreamsMap().entrySet()
walObject.getSubStreams().getSubStreamsMap().entrySet()
.stream()
.map(obj -> new long[] {obj.getKey(), obj.getValue().getStartOffset(), obj.getValue().getEndOffset()})
).toList();
Expand Down Expand Up @@ -249,14 +240,17 @@ public CompletableFuture<Void> commitWalObject(S3WALObject walObject,
s3WALObject.setCommittedTimestamp(new Date());
s3WALObject.setNodeId(brokerId);
s3WALObject.setSequenceId(sequenceId);
s3WALObject.setSubStreams(gson.toJson(walObject.getSubStreamsMap()));
String subStreams = JsonFormat.printer().print(walObject.getSubStreams());
s3WALObject.setSubStreams(subStreams);
s3WALObjectMapper.create(s3WALObject);
}

session.commit();
LOGGER.info("broker[broke-id={}] commit wal object[object-id={}] success, compacted objects[{}], stream objects[{}]",
brokerId, walObject.getObjectId(), compactedObjects, streamObjects);
future.complete(null);
} catch (InvalidProtocolBufferException e) {
future.completeExceptionally(e);
}
} else {
CommitWALObjectRequest request = CommitWALObjectRequest.newBuilder()
Expand Down Expand Up @@ -376,11 +370,14 @@ public CompletableFuture<List<S3WALObject>> listWALObjects() {
S3WalObjectMapper s3WalObjectMapper = session.getMapper(S3WalObjectMapper.class);
List<S3WALObject> walObjects = s3WalObjectMapper.list(metadataStore.config().nodeId(), null).stream()
.map(s3WALObject -> {
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WALObject.getSubStreams().getBytes(StandardCharsets.UTF_8)),
new TypeToken<>() {
});
return buildS3WALObject(s3WALObject, subStreams);
try {
return buildS3WALObject(s3WALObject, decode(s3WALObject.getSubStreams()));
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Failed to deserialize SubStreams", e);
return null;
}
})
.filter(Objects::nonNull)
.toList();
future.complete(walObjects);
}
Expand All @@ -407,18 +404,22 @@ public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long s
List<S3WalObject> s3WalObjects = s3WalObjectMapper.list(nodeId, null);
s3WalObjects.stream()
.map(s3WalObject -> {
TypeToken<Map<Long, SubStream>> typeToken = new TypeToken<>() {
};
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WalObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken);
Map<Long, SubStream> streamsRecords = new HashMap<>();
if (!Objects.isNull(subStreams) && subStreams.containsKey(streamId)) {
SubStream subStream = subStreams.get(streamId);
if (subStream.getStartOffset() <= endOffset && subStream.getEndOffset() > startOffset) {
streamsRecords.put(streamId, subStream);
try {
Map<Long, SubStream> subStreams = decode(s3WalObject.getSubStreams()).getSubStreamsMap();
Map<Long, SubStream> streamsRecords = new HashMap<>();
if (subStreams.containsKey(streamId)) {
SubStream subStream = subStreams.get(streamId);
if (subStream.getStartOffset() <= endOffset && subStream.getEndOffset() > startOffset) {
streamsRecords.put(streamId, subStream);
}
}
}
if (!streamsRecords.isEmpty()) {
return buildS3WALObject(s3WalObject, streamsRecords);
if (!streamsRecords.isEmpty()) {
return buildS3WALObject(s3WalObject, SubStreams.newBuilder()
.putAllSubStreams(streamsRecords)
.build());
}
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Failed to deserialize SubStreams", e);
}
return null;
})
Expand All @@ -428,8 +429,8 @@ public CompletableFuture<List<S3WALObject>> listWALObjects(long streamId, long s

// Sort by start-offset of the given stream
s3WALObjects.sort((l, r) -> {
long lhs = l.getSubStreamsMap().get(streamId).getStartOffset();
long rhs = r.getSubStreamsMap().get(streamId).getStartOffset();
long lhs = l.getSubStreams().getSubStreamsMap().get(streamId).getStartOffset();
long rhs = r.getSubStreams().getSubStreamsMap().get(streamId).getStartOffset();
return Long.compare(lhs, rhs);
});

Expand Down Expand Up @@ -466,18 +467,24 @@ private S3StreamObject buildS3StreamObject(

private S3WALObject buildS3WALObject(
S3WalObject originalObject,
Map<Long, SubStream> subStreams) {
SubStreams subStreams) {
return S3WALObject.newBuilder()
.setObjectId(originalObject.getObjectId())
.setObjectSize(originalObject.getObjectSize())
.setBrokerId(originalObject.getNodeId())
.setSequenceId(originalObject.getSequenceId())
.setBaseDataTimestamp(originalObject.getBaseDataTimestamp().getTime())
.setCommittedTimestamp(originalObject.getCommittedTimestamp() != null ? originalObject.getCommittedTimestamp().getTime() : S3Constants.NOOP_OBJECT_COMMIT_TIMESTAMP)
.putAllSubStreams(subStreams)
.setSubStreams(subStreams)
.build();
}

private SubStreams decode(String json) throws InvalidProtocolBufferException {
SubStreams.Builder builder = SubStreams.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(json, builder);
return builder.build();
}

private boolean commitObject(Long objectId, long streamId, long objectSize, SqlSession session) {
S3ObjectMapper s3ObjectMapper = session.getMapper(S3ObjectMapper.class);
S3Object s3Object = s3ObjectMapper.getById(objectId);
Expand Down Expand Up @@ -559,24 +566,28 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
s3WalObjectMapper.list(null, null)
.stream()
.map(s3WalObject -> {
TypeToken<Map<Long, SubStream>> typeToken = new TypeToken<>() {
};
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WalObject.getSubStreams().getBytes(StandardCharsets.UTF_8)), typeToken);
Map<Long, SubStream> streamsRecords = new HashMap<>();
subStreams.entrySet().stream()
.filter(entry -> !Objects.isNull(entry) && entry.getKey().equals(streamId))
.filter(entry -> entry.getValue().getStartOffset() <= endOffset && entry.getValue().getEndOffset() > startOffset)
.forEach(entry -> streamsRecords.put(entry.getKey(), entry.getValue()));
return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WalObject, streamsRecords);
try {
Map<Long, SubStream> subStreams = decode(s3WalObject.getSubStreams()).getSubStreamsMap();
Map<Long, SubStream> streamsRecords = new HashMap<>();
subStreams.entrySet().stream()
.filter(entry -> !Objects.isNull(entry) && entry.getKey().equals(streamId))
.filter(entry -> entry.getValue().getStartOffset() <= endOffset && entry.getValue().getEndOffset() > startOffset)
.forEach(entry -> streamsRecords.put(entry.getKey(), entry.getValue()));
return streamsRecords.isEmpty() ? null : buildS3WALObject(s3WalObject,
SubStreams.newBuilder().putAllSubStreams(streamsRecords).build());
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Failed to deserialize SubStreams", e);
return null;
}
})
.filter(Objects::nonNull)
.limit(limit)
.forEach(walObjects::add);

if (!walObjects.isEmpty()) {
walObjects.sort((l, r) -> {
long lhs = l.getSubStreamsMap().get(streamId).getStartOffset();
long rhs = r.getSubStreamsMap().get(streamId).getStartOffset();
long lhs = l.getSubStreams().getSubStreamsMap().get(streamId).getStartOffset();
long rhs = r.getSubStreams().getSubStreamsMap().get(streamId).getStartOffset();
return Long.compare(lhs, rhs);
});
}
Expand All @@ -592,8 +603,8 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
walObjects.stream()
.map(s3WALObject -> new long[] {
s3WALObject.getObjectId(),
s3WALObject.getSubStreamsMap().get(streamId).getStartOffset(),
s3WALObject.getSubStreamsMap().get(streamId).getEndOffset()
s3WALObject.getSubStreams().getSubStreamsMap().get(streamId).getStartOffset(),
s3WALObject.getSubStreams().getSubStreamsMap().get(streamId).getEndOffset()
})
).sorted((l, r) -> {
if (l[1] == r[1]) {
Expand All @@ -617,7 +628,6 @@ public CompletableFuture<Pair<List<S3StreamObject>, List<S3WALObject>>> listObje
}, metadataStore.asyncExecutor());
}


public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long newStartOffset) {
CompletableFuture<Void> future = new CompletableFuture<>();
for (; ; ) {
Expand Down Expand Up @@ -706,15 +716,18 @@ public CompletableFuture<Void> trimStream(long streamId, long streamEpoch, long
// remove wal object or remove sub-stream range in wal object
s3WALObjectMapper.list(stream.getDstNodeId(), null).stream()
.map(s3WALObject -> {
Map<Long, SubStream> subStreams = gson.fromJson(new String(s3WALObject.getSubStreams().getBytes(StandardCharsets.UTF_8)),
new TypeToken<>() {
});
return buildS3WALObject(s3WALObject, subStreams);
try {
return buildS3WALObject(s3WALObject, decode(s3WALObject.getSubStreams()));
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Failed to decode");
return null;
}
})
.filter(s3WALObject -> s3WALObject.getSubStreamsMap().containsKey(streamId))
.filter(s3WALObject -> s3WALObject.getSubStreamsMap().get(streamId).getEndOffset() <= newStartOffset)
.filter(Objects::nonNull)
.filter(s3WALObject -> s3WALObject.getSubStreams().getSubStreamsMap().containsKey(streamId))
.filter(s3WALObject -> s3WALObject.getSubStreams().getSubStreamsMap().get(streamId).getEndOffset() <= newStartOffset)
.forEach(s3WALObject -> {
if (s3WALObject.getSubStreamsMap().size() == 1) {
if (s3WALObject.getSubStreams().getSubStreamsMap().size() == 1) {
// only this range, but we will remove this range, so now we can remove this wal object
S3Object s3Object = s3ObjectMapper.getById(s3WALObject.getObjectId());
s3Object.setMarkedForDeletionTimestamp(new Date());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,7 @@
* limitations under the License.
*/

package com.automq.rocketmq.controller.metadata.database.serde;
package com.automq.rocketmq.controller.metadata.database.cache;

import apache.rocketmq.controller.v1.SubStream;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import java.lang.reflect.Type;

public class SubStreamSerializer implements JsonSerializer<SubStream> {
@Override
public JsonElement serialize(SubStream stream, Type type, JsonSerializationContext context) {
JsonObject root = new JsonObject();
root.addProperty("streamId", stream.getStreamId());
root.addProperty("startOffset", stream.getStartOffset());
root.addProperty("endOffset", stream.getEndOffset());
return root;
}
public class S3ObjectCache {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.controller.metadata.database.cache;

public class S3StreamObjectCache {
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class Node {

private Date updateTime = new Date();


public int getId() {
return id;
}
Expand Down Expand Up @@ -144,4 +143,15 @@ public Date getUpdateTime() {
public void setUpdateTime(Date updateTime) {
this.updateTime = updateTime;
}

@Override
public String toString() {
return "Node {" +
"id=" + id +
", name='" + name + '\'' +
", address='" + address + '\'' +
", epoch=" + epoch +
", updateTime=" + updateTime +
'}';
}
}

This file was deleted.

Loading