From 05704a4c8dcc80f77aedab2ac8a6915b525e7afb Mon Sep 17 00:00:00 2001 From: SSpirits Date: Wed, 27 Dec 2023 11:47:35 +0800 Subject: [PATCH 1/2] feat(stream): add stream range continuous check for MemoryMetadataManager Signed-off-by: SSpirits --- .../s3/memory/MemoryMetadataManager.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index f64ccc195..eaa2928d0 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -82,6 +82,9 @@ public synchronized CompletableFuture commitStrea for (ObjectStreamRange range : request.getStreamRanges()) { StreamMetadata stream = streams.get(range.getStreamId()); assert stream != null; + if (stream.getEndOffset() != range.getStartOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset " + range.getStartOffset()); + } stream.setEndOffset(range.getEndOffset()); } @@ -95,7 +98,12 @@ public synchronized CompletableFuture commitStrea long streamId = streamObject.getStreamId(); StreamMetadata stream = streams.get(streamId); assert stream != null; - stream.setEndOffset(streamObject.getEndOffset()); + if (stream.getEndOffset() < streamObject.getEndOffset()) { + if (stream.getEndOffset() != streamObject.getStartOffset()) { + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset " + streamObject.getStartOffset()); + } + stream.setEndOffset(streamObject.getEndOffset()); + } List metadataList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()); metadataList.add( @@ -112,12 +120,14 @@ public synchronized CompletableFuture commitStrea @Override public synchronized CompletableFuture compactStreamObject(CompactStreamObjectRequest request) { long streamId = request.getStreamId(); - StreamObject streamObject = new StreamObject(); - streamObject.setStreamId(streamId); - streamObject.setStartOffset(request.getStartOffset()); - streamObject.setEndOffset(request.getEndOffset()); - streamObject.setObjectId(request.getObjectId()); - streamObject.setObjectSize(request.getObjectSize()); + StreamMetadata stream = streams.get(streamId); + assert stream != null; + if (stream.getEndOffset() < request.getEndOffset()) { + throw new IllegalArgumentException("stream " + streamId + " end offset " + stream.getEndOffset() + " is lesser than request " + request.getEndOffset()); + } + if (stream.getStartOffset() > request.getStartOffset()) { + throw new IllegalArgumentException("stream " + streamId + " start offset " + stream.getStartOffset() + " is greater than request " + request.getStartOffset()); + } streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) .add(new S3ObjectMetadata( @@ -177,7 +187,7 @@ public synchronized CompletableFuture> getStreamObjects(l @Override public synchronized CompletableFuture> getOpeningStreams() { - return CompletableFuture.completedFuture(streams.values().stream().toList()); + return CompletableFuture.completedFuture(streams.values().stream().filter(stream -> stream.getState() == StreamState.OPENED).toList()); } @Override From 0654f3afd85fa2e6b53b6b31ffa31ae323bc574c Mon Sep 17 00:00:00 2001 From: SSpirits Date: Wed, 27 Dec 2023 14:13:21 +0800 Subject: [PATCH 2/2] feat(stream): add stream range continuous check for MemoryMetadataManager Signed-off-by: SSpirits --- .../s3/memory/MemoryMetadataManager.java | 36 ++++++++++++++----- .../stream/s3/objects/ObjectManagerTest.java | 22 ++++++------ 2 files changed, 40 insertions(+), 18 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java index eaa2928d0..a2033d5ab 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/memory/MemoryMetadataManager.java @@ -82,10 +82,21 @@ public synchronized CompletableFuture commitStrea for (ObjectStreamRange range : request.getStreamRanges()) { StreamMetadata stream = streams.get(range.getStreamId()); assert stream != null; - if (stream.getEndOffset() != range.getStartOffset()) { - throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset " + range.getStartOffset()); + if (request.getCompactedObjectIds().isEmpty()) { + // Commit new object. + if (stream.getEndOffset() != range.getStartOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset of request " + range.getStartOffset()); + } + stream.setEndOffset(range.getEndOffset()); + } else { + // Compact old object. + if (stream.getEndOffset() < range.getEndOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " end offset " + stream.getEndOffset() + " is lesser than request " + range.getEndOffset()); + } + if (stream.getStartOffset() > range.getStartOffset()) { + throw new IllegalArgumentException("stream " + range.getStreamId() + " start offset " + stream.getStartOffset() + " is greater than request " + range.getStartOffset()); + } } - stream.setEndOffset(range.getEndOffset()); } S3ObjectMetadata object = new S3ObjectMetadata( @@ -98,11 +109,20 @@ public synchronized CompletableFuture commitStrea long streamId = streamObject.getStreamId(); StreamMetadata stream = streams.get(streamId); assert stream != null; - if (stream.getEndOffset() < streamObject.getEndOffset()) { + if (request.getCompactedObjectIds().isEmpty()) { + // Commit new object. if (stream.getEndOffset() != streamObject.getStartOffset()) { - throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset " + streamObject.getStartOffset()); + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.getEndOffset() + " is not equal to start offset of request " + streamObject.getStartOffset()); } stream.setEndOffset(streamObject.getEndOffset()); + } else { + // Compact old object. + if (stream.getEndOffset() < streamObject.getEndOffset()) { + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " end offset " + stream.getEndOffset() + " is lesser than request " + streamObject.getEndOffset()); + } + if (stream.getStartOffset() > streamObject.getStartOffset()) { + throw new IllegalArgumentException("stream " + streamObject.getStreamId() + " start offset " + stream.getStartOffset() + " is greater than request " + streamObject.getStartOffset()); + } } List metadataList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()); @@ -146,11 +166,11 @@ public synchronized CompletableFuture> getObjects(long st List streamSetObjectList = streamSetObjects.values() .stream() .map(Pair::getRight) - .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1))) + .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1))) .toList(); List streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) .stream() - .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1))) + .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1))) .toList(); List result = new ArrayList<>(); @@ -179,7 +199,7 @@ public synchronized CompletableFuture> getStreamObjects(l long endOffset, int limit) { List streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>()) .stream() - .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1))) + .filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() < endOffset || endOffset == -1))) .limit(limit) .toList(); return CompletableFuture.completedFuture(streamObjectList); diff --git a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java index 098bd4043..83539351b 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/objects/ObjectManagerTest.java @@ -74,6 +74,14 @@ void testCommitAndCompact() { streamObject.setStartOffset(0); streamObject.setEndOffset(10); streamObjectList.add(streamObject); + + streamObject = new StreamObject(); + streamObject.setObjectId(4); + streamObject.setStreamId(2); + streamObject.setStartOffset(10); + streamObject.setEndOffset(20); + streamObjectList.add(streamObject); + request.setStreamObjects(streamObjectList); objectManager.commitStreamSetObject(request).join(); @@ -81,7 +89,7 @@ void testCommitAndCompact() { assertEquals(3, streamMetadataList.size()); assertEquals(3, streamMetadataList.get(0).getEndOffset()); assertEquals(5, streamMetadataList.get(1).getEndOffset()); - assertEquals(10, streamMetadataList.get(2).getEndOffset()); + assertEquals(20, streamMetadataList.get(2).getEndOffset()); List streamSetObjectMetadataList = objectManager.getServerObjects().join(); assertEquals(1, streamSetObjectMetadataList.size()); @@ -105,6 +113,9 @@ void testCommitAndCompact() { assertEquals(0, ranges.get(0).getStartOffset()); assertEquals(10, ranges.get(0).getEndOffset()); + streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 20, 100).join(); + assertEquals(2, streamObjectMetadataList.size()); + // Compact stream set object and commit stream object. request = new CommitStreamSetObjectRequest(); request.setObjectId(ObjectUtils.NOOP_OBJECT_ID); @@ -125,13 +136,6 @@ void testCommitAndCompact() { streamObject.setEndOffset(5); streamObjectList.add(streamObject); - streamObject = new StreamObject(); - streamObject.setObjectId(4); - streamObject.setStreamId(2); - streamObject.setStartOffset(10); - streamObject.setEndOffset(20); - streamObjectList.add(streamObject); - request.setStreamObjects(streamObjectList); objectManager.commitStreamSetObject(request).join(); @@ -142,8 +146,6 @@ void testCommitAndCompact() { assertEquals(1, streamObjectMetadataList.size()); streamObjectMetadataList = objectManager.getStreamObjects(1, 0, 10, 100).join(); assertEquals(1, streamObjectMetadataList.size()); - streamObjectMetadataList = objectManager.getStreamObjects(2, 0, 10, 100).join(); - assertEquals(2, streamObjectMetadataList.size()); // Compact stream object. objectManager.compactStreamObject(new CompactStreamObjectRequest(5, 2000, 2, 0, 20, List.of(1L, 4L))).join();