Skip to content

Commit

Permalink
Support Redis 72 stream with RDB_TYPE_STREAM_LISTPACKS_3 (#70)
Browse files Browse the repository at this point in the history
* support stream listpack3 21

Signed-off-by: catcherwong <catcher_hwq@outlook.com>

* fix bugs and add test for 72 stream

Signed-off-by: catcherwong <catcher_hwq@outlook.com>

---------

Signed-off-by: catcherwong <catcher_hwq@outlook.com>
  • Loading branch information
catcherwong authored Oct 1, 2024
1 parent 11b3b5b commit d7ba809
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/RDBParser/BinaryReaderRDBParser.Base.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ private void SkipObject(BinaryReader br, int encType)
SkipModule(br);
}
else if (encType == Constant.DataType.STREAM_LISTPACKS
|| encType == Constant.DataType.STREAM_LISTPACKS_2)
|| encType == Constant.DataType.STREAM_LISTPACKS_2
|| encType == Constant.DataType.STREAM_LISTPACKS_3)
{
SkipStream(br, encType);
}
Expand Down
22 changes: 18 additions & 4 deletions src/RDBParser/BinaryReaderRDBParser.Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private void ReadStream(BinaryReader br, int encType)
var firstEntryId = "0-0";
var maxDeletedEntryId = "0-0";
ulong entriesAdded = items;
if (encType == Constant.DataType.STREAM_LISTPACKS_2)
if (encType >= Constant.DataType.STREAM_LISTPACKS_2)
{
// the first entry ID
var firstMs = br.ReadLength();
Expand All @@ -53,18 +53,21 @@ private void ReadStream(BinaryReader br, int encType)
entriesAdded = br.ReadLength();
}

// Consumer groups loading
var cgroups = br.ReadLength();
var cgroupsData = new List<StreamCGEntity>();
while (cgroups > 0)
{
var cgName = br.ReadStr();
// ms
var l = br.ReadLength();
// seq
var r = br.ReadLength();
var lastCgEntryId = $"{l}-{r}";

// group offset
ulong cgOffset = 0;
if (encType == Constant.DataType.STREAM_LISTPACKS_2)
if (encType >= Constant.DataType.STREAM_LISTPACKS_2)
{
cgOffset = br.ReadLength();
}
Expand Down Expand Up @@ -96,6 +99,12 @@ private void ReadStream(BinaryReader br, int encType)
var cname = br.ReadStr();
var seenTime = br.ReadUInt64();

var activeTime = seenTime;
if (encType >= Constant.DataType.STREAM_LISTPACKS_3)
{
activeTime = br.ReadUInt64();
}

// the PEL about entries owned by this specific consumer
pelSize = br.ReadLength();
var consumerPendingEntries = new List<StreamConsumerPendingEntity>();
Expand All @@ -115,6 +124,7 @@ private void ReadStream(BinaryReader br, int encType)
{
Name = cname,
SeenTime = seenTime,
ActiveTime = activeTime,
Pending = consumerPendingEntries
});

Expand Down Expand Up @@ -162,7 +172,7 @@ private void SkipStream(BinaryReader br, int encType)
_ = br.ReadLength();
_ = br.ReadLength();

if (encType == Constant.DataType.STREAM_LISTPACKS_2)
if (encType >= Constant.DataType.STREAM_LISTPACKS_2)
{
// the first entry ID
_ = br.ReadLength();
Expand All @@ -183,7 +193,7 @@ private void SkipStream(BinaryReader br, int encType)
_ = br.ReadLength();
_ = br.ReadLength();

if (encType == Constant.DataType.STREAM_LISTPACKS_2)
if (encType >= Constant.DataType.STREAM_LISTPACKS_2)
{
_ = br.ReadLength();
}
Expand All @@ -202,6 +212,10 @@ private void SkipStream(BinaryReader br, int encType)
{
br.SkipStr();
br.ReadBytes(8);
if (encType >= Constant.DataType.STREAM_LISTPACKS_3)
{
br.ReadBytes(8);
}
pending = br.ReadLength();
br.ReadBytes((int)(pending * 16));

Expand Down
6 changes: 5 additions & 1 deletion src/RDBParser/Callbacks/Models/StreamConsumerEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ public class StreamConsumerEntity
/// </summary>
public byte[] Name { get; set; }
/// <summary>
/// Last time this consumer was active.
/// Last time this consumer tried to perform an action (attempted reading/claiming).
/// </summary>
public ulong SeenTime { get; set; }
/// <summary>
/// Last time this consumer was active (successful reading/claiming).
/// </summary>
public ulong ActiveTime { get; set; }
/// <summary>
/// Consumer specific pending entries list
/// </summary>
public List<StreamConsumerPendingEntity> Pending { get; set; }
Expand Down
71 changes: 71 additions & 0 deletions tests/RDBParserTests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,5 +153,76 @@ public void TestStreamsWithRedis70AndGroup()
var streamId = RedisRdbObjectHelper.GetStreamId(pending.Id);
Assert.Equal("1526919030474-56", streamId);
}

[Fact]
public void TestStreamsWithRedis72AndGroup()
{
// xadd mystream 1526919030474-55 message 1
// xadd mystream 1526919030474-56 message 2
// xadd mystream 1526919030474-57 message 3
// XGROUP create mystream sg 0-0
// XREADGROUP group sg c1 count 1 streams mystream >
// XACK mystream sg "1526919030474-55"
// XREADGROUP group sg c1 count 1 streams mystream >
// bgsave
var path = TestHelper.GetRDBPath("redis_72_stream.rdb");

var callback = new TestReaderCallback(_output);
var parser = new BinaryReaderRDBParser(callback);
parser.Parse(path);

var lengths = callback.GetLengths();
var streamEntities = callback.GetStreamEntities();

Assert.Equal(3, lengths[0][Encoding.UTF8.GetBytes("mystream")]);

var streamEntity = streamEntities[0][Encoding.UTF8.GetBytes("mystream")];
Assert.Single(streamEntity);

var se = streamEntity.Single();

Assert.Equal("1526919030474-55", se.FirstId);
Assert.Equal("1526919030474-57", se.LastId);

var sgList = se.CGroups;
Assert.Single(sgList);

var sg = sgList.Single();
Assert.Equal(Encoding.UTF8.GetBytes("sg"), sg.Name);
Assert.Equal("1526919030474-56", sg.LastEntryId);
Assert.Single(sg.Consumers);
Assert.Single(sg.Pending);
Assert.Equal((ulong)2, sg.EntriesRead);

var consumer = sg.Consumers.Single();
Assert.Equal(Encoding.UTF8.GetBytes("c1"), consumer.Name);
Assert.Single(consumer.Pending);

var pending = sg.Pending.Single();
var streamId = RedisRdbObjectHelper.GetStreamId(pending.Id);
Assert.Equal("1526919030474-56", streamId);
}

[Fact]
public void SkipTestStreamsWithRedis72AndGroup()
{
// xadd mystream 1526919030474-55 message 1
// xadd mystream 1526919030474-56 message 2
// xadd mystream 1526919030474-57 message 3
// XGROUP create mystream sg 0-0
// XREADGROUP group sg c1 count 1 streams mystream >
// XACK mystream sg "1526919030474-55"
// XREADGROUP group sg c1 count 1 streams mystream >
// bgsave
var path = TestHelper.GetRDBPath("redis_72_stream_for_skip.rdb");

var callback = new TestReaderCallback(_output);
var parser = new BinaryReaderRDBParser(callback, new ParserFilter { Types = new System.Collections.Generic.List<string> { "string"} });
parser.Parse(path);

var lengths = callback.GetLengths();

Assert.Empty(lengths[0]);
}
}
}
Binary file added tests/dumps/redis_72_stream.rdb
Binary file not shown.
Binary file added tests/dumps/redis_72_stream_for_skip.rdb
Binary file not shown.

0 comments on commit d7ba809

Please sign in to comment.