Skip to content

Commit

Permalink
[WebPubSubClient] Parsing message returns IList (#39137)
Browse files Browse the repository at this point in the history
* Parsing message returns IList

* update api

* Use IReadonlyList
  • Loading branch information
zackliu authored Oct 9, 2023
1 parent ff337a1 commit d9948ec
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public WebPubSubJsonProtocol() { }
public override string Name { get { throw null; } }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get { throw null; } }
public override System.ReadOnlyMemory<byte> GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message) { throw null; }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override System.Collections.Generic.IReadOnlyList<Azure.Messaging.WebPubSub.Clients.WebPubSubMessage> ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter<byte> output) { }
}
public partial class WebPubSubJsonReliableProtocol : Azure.Messaging.WebPubSub.Clients.WebPubSubProtocol
Expand All @@ -162,7 +162,7 @@ public WebPubSubJsonReliableProtocol() { }
public override string Name { get { throw null; } }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get { throw null; } }
public override System.ReadOnlyMemory<byte> GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message) { throw null; }
public override Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override System.Collections.Generic.IReadOnlyList<Azure.Messaging.WebPubSub.Clients.WebPubSubMessage> ParseMessage(System.Buffers.ReadOnlySequence<byte> input) { throw null; }
public override void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter<byte> output) { }
}
public abstract partial class WebPubSubMessage
Expand All @@ -176,7 +176,7 @@ protected WebPubSubProtocol() { }
public abstract string Name { get; }
public abstract Azure.Messaging.WebPubSub.Clients.WebPubSubProtocolMessageType WebSocketMessageType { get; }
public abstract System.ReadOnlyMemory<byte> GetMessageBytes(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message);
public abstract Azure.Messaging.WebPubSub.Clients.WebPubSubMessage ParseMessage(System.Buffers.ReadOnlySequence<byte> input);
public abstract System.Collections.Generic.IReadOnlyList<Azure.Messaging.WebPubSub.Clients.WebPubSubMessage> ParseMessage(System.Buffers.ReadOnlySequence<byte> input);
public abstract void WriteMessage(Azure.Messaging.WebPubSub.Clients.WebPubSubMessage message, System.Buffers.IBufferWriter<byte> output);
}
public enum WebPubSubProtocolMessageType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override ReadOnlyMemory<byte> GetMessageBytes(WebPubSubMessage message)
/// </summary>
/// <param name="input">The serialized representation of the message.</param>
/// <returns>A <see cref="WebPubSubMessage"/></returns>
public override WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
public override IReadOnlyList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
{
return _processor.ParseMessage(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ReadOnlyMemory<byte> GetMessageBytes(WebPubSubMessage message)
return new Memory<byte>(writer.ToArray());
}

public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
public virtual IReadOnlyList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
{
try
{
Expand Down Expand Up @@ -264,7 +264,7 @@ public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
case DownstreamEventType.Ack:
AssertNotNull(ackId, AckIdPropertyName);
AssertNotNull(success, SuccessPropertyName);
return new AckMessage(ackId.Value, success.Value, errorDetail);
return new List<WebPubSubMessage> { new AckMessage(ackId.Value, success.Value, errorDetail) };

case DownstreamEventType.Message:
AssertNotNull(from, FromPropertyName);
Expand All @@ -273,10 +273,10 @@ public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
switch (fromType)
{
case FromType.Server:
return new ServerDataMessage(dataType, data, sequenceId);
return new List<WebPubSubMessage> { new ServerDataMessage(dataType, data, sequenceId) };
case FromType.Group:
AssertNotNull(group, GroupPropertyName);
return new GroupDataMessage(group, dataType, data, sequenceId, fromUserId);
return new List<WebPubSubMessage> { new GroupDataMessage(group, dataType, data, sequenceId, fromUserId) };
// Forward compatible
default:
return null;
Expand All @@ -288,9 +288,9 @@ public virtual WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
switch (systemEventType)
{
case SystemEventType.Connected:
return new ConnectedMessage(userId, connectionId, reconnectionToken);
return new List<WebPubSubMessage> { new ConnectedMessage(userId, connectionId, reconnectionToken) };
case SystemEventType.Disconnected:
return new DisconnectedMessage(message);
return new List<WebPubSubMessage> { new DisconnectedMessage(message) };
// Forward compatible
default:
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public override ReadOnlyMemory<byte> GetMessageBytes(WebPubSubMessage message)
/// </summary>
/// <param name="input">The serialized representation of the message.</param>
/// <returns>A <see cref="WebPubSubMessage"/></returns>
public override WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input)
public override IReadOnlyList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input)
{
return _processor.ParseMessage(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public abstract class WebPubSubProtocol
/// </summary>
/// <param name="input">The serialized representation of the message.</param>
/// <returns>A <see cref="WebPubSubMessage"/></returns>
public abstract WebPubSubMessage ParseMessage(ReadOnlySequence<byte> input);
public abstract IReadOnlyList<WebPubSubMessage> ParseMessage(ReadOnlySequence<byte> input);

/// <summary>
/// Writes the specified <see cref="WebPubSubMessage"/> to a writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,10 @@ private async Task ListenLoop(IWebSocketClient client, CancellationToken token)
{
try
{
var message = _protocol.ParseMessage(result.Payload);
await HandleMessageAsync(message, token).ConfigureAwait(false);
foreach (var message in _protocol.ParseMessage(result.Payload))
{
await HandleMessageAsync(message, token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
Expand Down Expand Up @@ -152,7 +153,7 @@ public void ParseMessageTest(byte[] payload, Action<WebPubSubMessage> messageAss
{
var protocol = new WebPubSubJsonProtocol();
var resolvedMessage = protocol.ParseMessage(new ReadOnlySequence<byte>(payload));
messageAssert(resolvedMessage);
messageAssert(resolvedMessage[0]);
}

[TestCaseSource(nameof(GetSerializingTestData))]
Expand Down

0 comments on commit d9948ec

Please sign in to comment.