Skip to content

Commit

Permalink
[ServiceBus] Use BinaryData for G/SetSessionState. (Azure#15656)
Browse files Browse the repository at this point in the history
* [DOES NOT BUILD] Changed type from byte[] to BinaryData.

* Regen API. Update samples. Fix broken tests and samples.

* Fixed passing encoded strings instead of the strings directly to BinaryData.

* Pushed BinaryData into the internal transport layer.

* Fixed broken test.
  • Loading branch information
MiYanni authored and annelo-msft committed Feb 17, 2021
1 parent 379cff4 commit 3548cf9
Show file tree
Hide file tree
Showing 17 changed files with 70 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ public ProcessSessionEventArgs(Azure.Messaging.ServiceBus.ServiceBusSessionRecei
public System.Threading.CancellationToken CancellationToken { get { throw null; } }
public string SessionId { get { throw null; } }
public System.DateTimeOffset SessionLockedUntil { get { throw null; } }
public virtual System.Threading.Tasks.Task<byte[]> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(byte[] sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(Azure.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class ProcessSessionMessageEventArgs : System.EventArgs
{
Expand All @@ -55,8 +55,8 @@ public ProcessSessionMessageEventArgs(Azure.Messaging.ServiceBus.ServiceBusRecei
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeadLetterMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, string deadLetterReason, string deadLetterErrorDescription = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task DeferMessageAsync(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, System.Collections.Generic.IDictionary<string, object> propertiesToModify = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<byte[]> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(byte[] sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(Azure.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public enum ReceiveMode
{
Expand Down Expand Up @@ -416,9 +416,9 @@ public partial class ServiceBusSessionReceiver : Azure.Messaging.ServiceBus.Serv
protected ServiceBusSessionReceiver() { }
public string SessionId { get { throw null; } }
public System.DateTimeOffset SessionLockedUntil { get { throw null; } }
public virtual System.Threading.Tasks.Task<byte[]> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.BinaryData> GetSessionStateAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task RenewSessionLockAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(byte[] sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task SetSessionStateAsync(Azure.BinaryData sessionState, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
}
public partial class ServiceBusSessionReceiverOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ Console.WriteLine(receivedMessage.SessionId);

// we can also set arbitrary session state using this receiver
// the state is specific to the session, and not any particular message
await receiver.SetSessionStateAsync(Encoding.UTF8.GetBytes("some state"));
await receiver.SetSessionStateAsync(new BinaryData("some state"));

// the state can be retrieved for the session as well
byte[] state = await receiver.GetSessionStateAsync();
BinaryData state = await receiver.GetSessionStateAsync();
```

### Receive from a specific session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async Task MessageHandler(ProcessSessionMessageEventArgs args)

// we can also set arbitrary session state using this receiver
// the state is specific to the session, and not any particular message
await args.SetSessionStateAsync(Encoding.Default.GetBytes("some state"));
await args.SetSessionStateAsync(new BinaryData("some state"));

// Once we've received the last message, complete the
// task completion source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var state = Encoding.UTF8.GetBytes("some state");
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiver.CompleteMessageAsync(receivedMessage);
await receiver.SetSessionStateAsync(state);
await receiver.SetSessionStateAsync(new BinaryData(state));
ts.Complete();
}
```
Expand Down
57 changes: 25 additions & 32 deletions sdk/servicebus/Azure.Messaging.ServiceBus/src/Amqp/AmqpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,21 @@ private async Task<ReceivingAmqpLink> OpenReceiverLinkAsync(
new DateTime(lockedUntilUtcTicks, DateTimeKind.Utc)
: DateTime.MinValue;

var source = (Source)link.Settings.Source;
if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.SessionFilterMissing);
}
var source = (Source)link.Settings.Source;
if (!source.FilterSet.TryGetValue<string>(AmqpClientConstants.SessionFilterName, out var tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.SessionFilterMissing);
}

if (string.IsNullOrWhiteSpace(tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
}
// This will only have changed if sessionId was left blank when constructing the session
// receiver.
SessionId = tempSessionId;
if (string.IsNullOrWhiteSpace(tempSessionId))
{
link.Session.SafeClose();
throw new ServiceBusException(true, Resources.AmqpFieldSessionId);
}
// This will only have changed if sessionId was left blank when constructing the session
// receiver.
SessionId = tempSessionId;
}
ServiceBusEventSource.Log.CreateReceiveLinkComplete(_identifier, SessionId);
link.Closed += OnReceiverLinkClosed;
Expand Down Expand Up @@ -1044,10 +1044,10 @@ internal async Task<DateTimeOffset> RenewSessionLockInternal(TimeSpan timeout)
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as byte array.</returns>
public override async Task<byte[]> GetStateAsync(CancellationToken cancellationToken = default)
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public override async Task<BinaryData> GetStateAsync(CancellationToken cancellationToken = default)
{
byte[] sessionState = null;
BinaryData sessionState = default;
await _retryPolicy.RunOperation(
async (timeout) =>
{
Expand All @@ -1058,7 +1058,7 @@ await _retryPolicy.RunOperation(
return sessionState;
}

internal async Task<byte[]> GetStateInternal(TimeSpan timeout)
internal async Task<BinaryData> GetStateInternal(TimeSpan timeout)
{
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.GetSessionStateOperation, timeout, null);

Expand All @@ -1071,12 +1071,12 @@ internal async Task<byte[]> GetStateInternal(TimeSpan timeout)

var amqpResponseMessage = await ExecuteRequest(timeout, amqpRequestMessage).ConfigureAwait(false);

byte[] sessionState = null;
BinaryData sessionState = default;
if (amqpResponseMessage.StatusCode == AmqpResponseStatusCode.OK)
{
if (amqpResponseMessage.Map[ManagementConstants.Properties.SessionState] != null)
{
sessionState = amqpResponseMessage.GetValue<ArraySegment<byte>>(ManagementConstants.Properties.SessionState).Array;
sessionState = new BinaryData(amqpResponseMessage.GetValue<ArraySegment<byte>>(ManagementConstants.Properties.SessionState).Array ?? Array.Empty<byte>());
}
}
else
Expand All @@ -1091,14 +1091,14 @@ internal async Task<byte[]> GetStateInternal(TimeSpan timeout)
/// Set a custom state on the session which can be later retrieved using <see cref="GetStateAsync"/>
/// </summary>
///
/// <param name="sessionState">A byte array of session state</param>
/// <param name="sessionState">A <see cref="BinaryData"/> of session state</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public override async Task SetStateAsync(
byte[] sessionState,
BinaryData sessionState,
CancellationToken cancellationToken)
{
await _retryPolicy.RunOperation(
Expand All @@ -1113,7 +1113,7 @@ await SetStateInternal(
}

internal async Task SetStateInternal(
byte[] sessionState,
BinaryData sessionState,
TimeSpan timeout)
{
var amqpRequestMessage = AmqpRequestMessage.CreateRequest(ManagementConstants.Operations.SetSessionStateOperation, timeout, null);
Expand All @@ -1125,15 +1125,8 @@ internal async Task SetStateInternal(

amqpRequestMessage.Map[ManagementConstants.Properties.SessionId] = SessionId;

if (sessionState != null)
{
var value = new ArraySegment<byte>(sessionState);
amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = value;
}
else
{
amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = null;
}
var value = new ArraySegment<byte>(sessionState.ToBytes().ToArray());
amqpRequestMessage.Map[ManagementConstants.Properties.SessionState] = value;

var amqpResponseMessage = await ExecuteRequest(timeout, amqpRequestMessage).ConfigureAwait(false);
if (amqpResponseMessage.StatusCode != AmqpResponseStatusCode.OK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,21 +209,21 @@ public abstract Task<DateTimeOffset> RenewMessageLockAsync(
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as byte array.</returns>
public abstract Task<byte[]> GetStateAsync(CancellationToken cancellationToken);
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public abstract Task<BinaryData> GetStateAsync(CancellationToken cancellationToken);

/// <summary>
/// Set a custom state on the session which can be later retrieved using <see cref="GetStateAsync"/>
/// </summary>
///
/// <param name="sessionState">A byte array of session state</param>
/// <param name="sessionState">A <see cref="BinaryData"/> of session state</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public abstract Task SetStateAsync(
byte[] sessionState,
BinaryData sessionState,
CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,23 @@ public ProcessSessionEventArgs(
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as byte array.</returns>
public virtual async Task<byte[]> GetSessionStateAsync(
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public virtual async Task<BinaryData> GetSessionStateAsync(
CancellationToken cancellationToken = default) =>
await _sessionReceiver.GetSessionStateAsync(cancellationToken).ConfigureAwait(false);

/// <summary>
/// Set a custom state on the session which can be later retrieved using <see cref="GetSessionStateAsync"/>
/// </summary>
///
/// <param name="sessionState">A byte array of session state</param>
/// <param name="sessionState">A <see cref="BinaryData"/> of session state</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public virtual async Task SetSessionStateAsync(
byte[] sessionState,
BinaryData sessionState,
CancellationToken cancellationToken = default) =>
await _sessionReceiver.SetSessionStateAsync(sessionState, cancellationToken).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public ProcessSessionMessageEventArgs(
}

/// <inheritdoc cref="ServiceBusSessionReceiver.GetSessionStateAsync(CancellationToken)"/>
public virtual async Task<byte[]> GetSessionStateAsync(
public virtual async Task<BinaryData> GetSessionStateAsync(
CancellationToken cancellationToken = default) =>
await _sessionReceiver.GetSessionStateAsync(cancellationToken).ConfigureAwait(false);

/// <inheritdoc cref="ServiceBusSessionReceiver.SetSessionStateAsync(byte[], CancellationToken)"/>
/// <inheritdoc cref="ServiceBusSessionReceiver.SetSessionStateAsync(BinaryData, CancellationToken)"/>
public virtual async Task SetSessionStateAsync(
byte[] sessionState,
BinaryData sessionState,
CancellationToken cancellationToken = default) =>
await _sessionReceiver.SetSessionStateAsync(sessionState, cancellationToken).ConfigureAwait(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ protected ServiceBusSessionReceiver() : base() { }
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The session state as byte array.</returns>
public virtual async Task<byte[]> GetSessionStateAsync(CancellationToken cancellationToken = default)
/// <returns>The session state as <see cref="BinaryData"/>.</returns>
public virtual async Task<BinaryData> GetSessionStateAsync(CancellationToken cancellationToken = default)
{
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver));
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
Expand All @@ -106,7 +106,7 @@ public virtual async Task<byte[]> GetSessionStateAsync(CancellationToken cancell
sessionId: SessionId);
scope.Start();

byte[] sessionState = null;
BinaryData sessionState;

try
{
Expand All @@ -127,14 +127,14 @@ public virtual async Task<byte[]> GetSessionStateAsync(CancellationToken cancell
/// Set a custom state on the session which can be later retrieved using <see cref="GetSessionStateAsync"/>
/// </summary>
///
/// <param name="sessionState">A byte array of session state</param>
/// <param name="sessionState">A <see cref="BinaryData"/> of session state</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks>
///
/// <returns>A task to be resolved on when the operation has completed.</returns>
public virtual async Task SetSessionStateAsync(
byte[] sessionState,
BinaryData sessionState,
CancellationToken cancellationToken = default)
{
Argument.AssertNotDisposed(IsClosed, nameof(ServiceBusSessionReceiver));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Azure.Core.Tests;
using Azure.Messaging.ServiceBus.Diagnostics;
Expand Down Expand Up @@ -180,7 +179,7 @@ public async Task SenderReceiverActivities(bool useSessions)
Assert.AreEqual(DiagnosticProperty.RenewSessionLockActivityName + ".Stop", renewStop.Key);

// set state
var state = Encoding.UTF8.GetBytes("state");
var state = new BinaryData("state");
await sessionReceiver.SetSessionStateAsync(state);
(string Key, object Value, DiagnosticListener) setStateStart = _listener.Events.Dequeue();
Assert.AreEqual(DiagnosticProperty.SetSessionStateActivityName + ".Start", setStateStart.Key);
Expand All @@ -197,7 +196,7 @@ public async Task SenderReceiverActivities(bool useSessions)

// get state
var getState = await sessionReceiver.GetSessionStateAsync();
Assert.AreEqual(state, getState);
Assert.AreEqual(state.ToBytes().ToArray(), getState.ToBytes().ToArray());
(string Key, object Value, DiagnosticListener) getStateStart = _listener.Events.Dequeue();
Assert.AreEqual(DiagnosticProperty.GetSessionStateActivityName + ".Start", getStateStart.Key);
Activity getStateActivity = (Activity)getStateStart.Value;
Expand Down
Loading

0 comments on commit 3548cf9

Please sign in to comment.