Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Buffer] implement with customizable storages #239

Merged
merged 1 commit into from
Feb 10, 2023

Conversation

twittemb
Copy link
Contributor

@twittemb twittemb commented Nov 26, 2022

Hi

This PR aims to explore an implementation without using actors as discussed here: https://forums.swift.org/t/swift-async-algorithms-buffer/61562

2 buffering algorithms are implemented:

  • one that uses a queue to stack the upstream elements with unbounded/bufferingNewest/bufferingOldest policies
  • one that uses a queue to stack the upstream elements until a limit is reached, suspending the upstream sequence in that case

The task that iterates over the upstream sequence is created on the first call to next(), unlike the original implementation where the task is created when the iterator is instantiated.

The BufferStorage protocol is public and allows to implement a custom algorithm. The concrete implementations are not public (keeping a reference on their functions).

I have added a throughput measurement. I can see a slight improvement but not decisive.

I guess there is space for improvement ...

@phausler @FranzBusch

@twittemb twittemb changed the title buffer: implement with custom storages [Buffer[ implement with customizable storages Jan 1, 2023
@twittemb twittemb changed the title [Buffer[ implement with customizable storages [Buffer] implement with customizable storages Jan 1, 2023
@twittemb twittemb force-pushed the feature/buffer branch 3 times, most recently from 2aa7305 to 247274b Compare January 5, 2023 13:59
@twittemb twittemb marked this pull request as ready for review January 5, 2023 14:03
Copy link
Member

@phausler phausler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here look quite good to me; it definitely addresses the outstanding concerns about safety and also seems to resolve the long standing flakey test that occurred with buffering.

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before diving into the actual implementation, I have a broader concern about the API we are going to expose here. With this new implementation we are exposing in my opinion some implementation details that are not necessary. More specifically we are exposing BufferStorage, SuspendingBufferStorage & QueuedBufferStorage.

Just from an API design perspective I think we should only expose this API

extension AsyncSequence where Self: Sendable {
    public func buffer(strategy: AsyncBufferSequence.Stragety ) -> AsyncBufferSequence<Self> {}
}

public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence, Sendable {}

public struct AsyncBufferSequence {
    public struct Strategy {
        public static let unbounded: Strategy
        public static let limited: Strategy
        public static let limitedDroppingLatest: Strategy
        public static let limitedDroppingOldest: Strategy
    } 
}

The above API allows us to evolve the underlying type however we want without exposing any internals. Furthermore, it allows us to add whatever buffering strategy we come up with without breaking API.

@twittemb
Copy link
Contributor Author

twittemb commented Jan 10, 2023

Before diving into the actual implementation, I have a broader concern about the API we are going to expose here. With this new implementation we are exposing in my opinion some implementation details that are not necessary. More specifically we are exposing BufferStorage, SuspendingBufferStorage & QueuedBufferStorage.

Just from an API design perspective I think we should only expose this API

extension AsyncSequence where Self: Sendable {
    public func buffer(strategy: AsyncBufferSequence.Stragety ) -> AsyncBufferSequence<Self> {}
}

public struct AsyncBufferSequence<Base: AsyncSequence & Sendable>: AsyncSequence, Sendable {}

public struct AsyncBufferSequence {
    public struct Strategy {
        public static let unbounded: Strategy
        public static let limited: Strategy
        public static let limitedDroppingLatest: Strategy
        public static let limitedDroppingOldest: Strategy
    } 
}

The above API allows us to evolve the underlying type however we want without exposing any internals. Furthermore, it allows us to add whatever buffering strategy we come up with without breaking API.

I like the idea of a "unified" strategy. not sure of the naming though to make the distinction between the suspending vs non-suspending storages.

How would you unify the 2 algorithms under the "AsyncBufferSequence" banner ? 1 concrete type per implementation (AsyncSuspendingBufferSequence and AsyncNonSuspendingBufferSequence) and then encapsulate them in a wrapper "AsyncBufferSequence" that just forwards the calls to the implementation?

Also how would handle the different Strategies with the struct ? because that have different properties ...

@FranzBusch
Copy link
Member

The above API allows us to evolve the underlying type however we want without exposing any internals. Furthermore, it allows us to add whatever buffering strategy we come up with without breaking API.

I agree that naming can be bike shed. Though I don't think we have to call out in the name of the strategy that it suspends or not. We can do a lot with doc comments here and explain exactly how the various strategies pull from the upstream.

How would you unify the 2 algorithms under the "AsyncBufferSequence" banner ? 1 concrete type per implementation (AsyncSuspendingBufferSequence and AsyncNonSuspendingBufferSequence) and then encapsulate them in a wrapper "AsyncBufferSequence" that just forwards the calls to the implementation?

I think we only need two concrete state machine & storage implementations and then have a private enum in the AsyncBufferSequence that looks like this:

enum Storage {
    case suspending
    case nonSuspending
}

However, I am questioning if we really need to implementation at all. IMO we should we be able to handle this all with a single state machine. We just need to have the same "latching" mechanism in the unstructured Task where the state machine dictates when to request the next value. In the case where we are non suspending we just always request the next.

Also how would handle the different Strategies with the struct ? because that have different properties ...

I don't know if I understand you correctly but the spelled out version of the Strategy type would look like this:

public struct AsyncBufferSequence {
public struct Strategy {
internal enum _Strategy {
case unbounded
case limited
....
}

    internal var _strategy: _Strategy

    public static let unbounded = Self(_strategy: .unbounded)
   ....
} 

}

@twittemb
Copy link
Contributor Author

twittemb commented Jan 10, 2023

However, I am questioning if we really need to implementation at all. IMO we should we be able to handle this all with a single state machine. We just need to have the same "latching" mechanism in the unstructured Task where the state machine dictates when to request the next value. In the case where we are non suspending we just always request the next.

I'm not even sure we need the "latching" mechanism. In fact we can see the non-suspending algorithm as a special case of the suspending one.

In the suspending version, the state machine checks if a suspension is needed before even suspending (like in the last implementation of the AsyncChannel in my PR). When the strategy is set to "non-suspending" then we can buffer the element and never suspend.

[update] @FranzBusch I'll try to implement a unified version of this algorithm by tomorrow.

@tcldr
Copy link

tcldr commented Jan 10, 2023

In fact we can see the non-suspending algorithm as a special case of the suspending one.

I think that this will emerge as a recurring theme with SAA. 👍

@twittemb
Copy link
Contributor Author

The above API allows us to evolve the underlying type however we want without exposing any internals. Furthermore, it allows us to add whatever buffering strategy we come up with without breaking API.

I agree that naming can be bike shed. Though I don't think we have to call out in the name of the strategy that it suspends or not. We can do a lot with doc comments here and explain exactly how the various strategies pull from the upstream.

How would you unify the 2 algorithms under the "AsyncBufferSequence" banner ? 1 concrete type per implementation (AsyncSuspendingBufferSequence and AsyncNonSuspendingBufferSequence) and then encapsulate them in a wrapper "AsyncBufferSequence" that just forwards the calls to the implementation?

I think we only need two concrete state machine & storage implementations and then have a private enum in the AsyncBufferSequence that looks like this:

enum Storage {
    case suspending
    case nonSuspending
}

However, I am questioning if we really need to implementation at all. IMO we should we be able to handle this all with a single state machine. We just need to have the same "latching" mechanism in the unstructured Task where the state machine dictates when to request the next value. In the case where we are non suspending we just always request the next.

Also how would handle the different Strategies with the struct ? because that have different properties ...

I don't know if I understand you correctly but the spelled out version of the Strategy type would look like this:

public struct AsyncBufferSequence { public struct Strategy { internal enum _Strategy { case unbounded case limited .... }

    internal var _strategy: _Strategy

    public static let unbounded = Self(_strategy: .unbounded)
   ....
} 

}

Hi @FranzBusch I've pushed a unified version. I was not able to implement the Strategy with a struct as having static properties is not allowed in a generic type (assuming we keep this type nested in AsyncBufferSequence). Moreover could you tell me the benefit of having a struct + static properties when it is backed by an enum ?

@FranzBusch
Copy link
Member

Hi @FranzBusch I've pushed a unified version. I was not able to implement the Strategy with a struct as having static properties is not allowed in a generic type (assuming we keep this type nested in AsyncBufferSequence). Moreover could you tell me the benefit of having a struct + static properties when it is backed by an enum ?

Ah right we are in a generic type here. Then we gotta move it to a separate top level type, e.g. AsyncBufferSequenceStrategy. The reason why we need to do this is that adding new enum cases is an API breaking change since enums can be exhaustively switched over. That means if you add a new enum case adopters will get compiler errors when they switch over it. The recommended pattern to avoid this right now is to use a struct + static lets.

@twittemb
Copy link
Contributor Author

Hi @FranzBusch I've pushed a unified version. I was not able to implement the Strategy with a struct as having static properties is not allowed in a generic type (assuming we keep this type nested in AsyncBufferSequence). Moreover could you tell me the benefit of having a struct + static properties when it is backed by an enum ?

Ah right we are in a generic type here. Then we gotta move it to a separate top level type, e.g. AsyncBufferSequenceStrategy. The reason why we need to do this is that adding new enum cases is an API breaking change since enums can be exhaustively switched over. That means if you add a new enum case adopters will get compiler errors when they switch over it. The recommended pattern to avoid this right now is to use a struct + static lets.

I've put the policy at the top level. I kept the Policy terminology because this is something that is already used for AsyncStream but I let you and @phausler give me the appropriate direction.

@twittemb twittemb force-pushed the feature/buffer branch 2 times, most recently from 2508240 to a7b97f6 Compare January 12, 2023 17:45
@twittemb twittemb requested a review from FranzBusch January 12, 2023 17:46
Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very good already. I think the public API is almost perfect.I left some more comments inline but I have one larger one. You merged the implementation of the bounded and unbounded buffer into a single state machine now. This results in a bit of friction since the unbounded cases can constantly request new elements from the upstream whereas the bounded case needs to suspend. With your current implementation the bounded case will request one too many element from the upstream since we suspend after calling next() on the upstream. We should correct that and only request as many elements as actually required.

I know we are going in circles a bit here but I really think we need the two separate Storage and StateMachines that you had previously. I would call them AsyncBoundedBufferStorage/StateMachine and AsyncUnboundedBufferStorage/StateMachine. To still have this code performant I would recommend to change the iterator like this

  public struct Iterator: AsyncIteratorProtocol {
    private enum Backing {
      case bounded(AsyncBoundedBufferStorage)
      case unbounded(AsyncUnboundedBufferStorage)
    }
    
    private let backing: Backing

    init(_ policy: AsyncBufferSequencePolicy.Policy) {
      switch policy { ... }
    }
    
    public mutating func next() async re throws -> Element? {
        switch self.backing {
             case .bounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()

             case .unbounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()
        }
    }

Importantly here, the Task creation goes into the individual storage implementation since they need to do different things.

Let me know what you think! Happy to discuss this

Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved

private enum State {
case buffering(buffer: Deque<Result<Element?, Error>>, suspendedProducer: SuspendedProducer?, suspendedConsumer: SuspendedConsumer?)
case modifying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get rid of that state now. The compiler is good enough to optimise this pattern nowadays.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on this one I'd like your input, I've noticed significant lower perfs without this state. what do you think ?

Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/BufferStorage.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/BufferStorage.swift Outdated Show resolved Hide resolved
Sources/AsyncAlgorithms/Buffer/BufferStateMachine.swift Outdated Show resolved Hide resolved
@twittemb
Copy link
Contributor Author

twittemb commented Jan 16, 2023

This looks very good already. I think the public API is almost perfect.I left some more comments inline but I have one larger one. You merged the implementation of the bounded and unbounded buffer into a single state machine now. This results in a bit of friction since the unbounded cases can constantly request new elements from the upstream whereas the bounded case needs to suspend. With your current implementation the bounded case will request one too many element from the upstream since we suspend after calling next() on the upstream. We should correct that and only request as many elements as actually required.

I know we are going in circles a bit here but I really think we need the two separate Storage and StateMachines that you had previously. I would call them AsyncBoundedBufferStorage/StateMachine and AsyncUnboundedBufferStorage/StateMachine. To still have this code performant I would recommend to change the iterator like this

  public struct Iterator: AsyncIteratorProtocol {
    private enum Backing {
      case bounded(AsyncBoundedBufferStorage)
      case unbounded(AsyncUnboundedBufferStorage)
    }
    
    private let backing: Backing

    init(_ policy: AsyncBufferSequencePolicy.Policy) {
      switch policy { ... }
    }
    
    public mutating func next() async re throws -> Element? {
        switch self.backing {
             case .bounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()

             case .unbounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()
        }
    }

Importantly here, the Task creation goes into the individual storage implementation since they need to do different things.

Let me know what you think! Happy to discuss this

Thanks for the review. It means (for the AsyncBoundedBufferStorage) that we indeed need to "pre-suspend" before requesting the next element (latching mechanism that we already used) right ?

I guess we could still unify both implementations but it might affect a bit the perfs of the unbounded algorithm with the following sequence of events:

  • do we need to suspend before next? (always false for unbounded, true or false for bounded depending on the buffer)
  • eventually suspend until room in the buffer (never happens for unbounded)
  • request next from upstream
  • send the element to the state machine

I'm not trying to unify things at all cost :-) just exploring if this is a satisfactory solution that could save us a bit of code. What do you think?

@twittemb
Copy link
Contributor Author

This looks very good already. I think the public API is almost perfect.I left some more comments inline but I have one larger one. You merged the implementation of the bounded and unbounded buffer into a single state machine now. This results in a bit of friction since the unbounded cases can constantly request new elements from the upstream whereas the bounded case needs to suspend. With your current implementation the bounded case will request one too many element from the upstream since we suspend after calling next() on the upstream. We should correct that and only request as many elements as actually required.
I know we are going in circles a bit here but I really think we need the two separate Storage and StateMachines that you had previously. I would call them AsyncBoundedBufferStorage/StateMachine and AsyncUnboundedBufferStorage/StateMachine. To still have this code performant I would recommend to change the iterator like this

  public struct Iterator: AsyncIteratorProtocol {
    private enum Backing {
      case bounded(AsyncBoundedBufferStorage)
      case unbounded(AsyncUnboundedBufferStorage)
    }
    
    private let backing: Backing

    init(_ policy: AsyncBufferSequencePolicy.Policy) {
      switch policy { ... }
    }
    
    public mutating func next() async re throws -> Element? {
        switch self.backing {
             case .bounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()

             case .unbounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()
        }
    }

Importantly here, the Task creation goes into the individual storage implementation since they need to do different things.
Let me know what you think! Happy to discuss this

Thanks for the review. It means (for the AsyncBoundedBufferStorage) that we indeed need to "pre-suspend" before requesting the next element (latching mechanism that we already used) right ?

I guess we could still unify both implementations but it might affect a bit the perfs of the unbounded algorithm with the following sequence of events:

  • do we need to suspend before next? (always false for unbounded, true or false for bounded depending on the buffer)
  • eventually suspend until room in the buffer (never happens for unbounded)
  • request next from upstream
  • send the element to the state machine

I'm not trying to unify things at all cost :-) just exploring if this is a satisfactory solution that could save us a bit of code. What do you think?

After giving it a second though, it’s probably not worth trying to keep a single storage for both algorithms… it would make it more complex to understand because of the conditional behaviour whether it’s bounded or unbounded. I’ll get back to you soon with an implementation

@twittemb
Copy link
Contributor Author

This looks very good already. I think the public API is almost perfect.I left some more comments inline but I have one larger one. You merged the implementation of the bounded and unbounded buffer into a single state machine now. This results in a bit of friction since the unbounded cases can constantly request new elements from the upstream whereas the bounded case needs to suspend. With your current implementation the bounded case will request one too many element from the upstream since we suspend after calling next() on the upstream. We should correct that and only request as many elements as actually required.

I know we are going in circles a bit here but I really think we need the two separate Storage and StateMachines that you had previously. I would call them AsyncBoundedBufferStorage/StateMachine and AsyncUnboundedBufferStorage/StateMachine. To still have this code performant I would recommend to change the iterator like this

  public struct Iterator: AsyncIteratorProtocol {
    private enum Backing {
      case bounded(AsyncBoundedBufferStorage)
      case unbounded(AsyncUnboundedBufferStorage)
    }
    
    private let backing: Backing

    init(_ policy: AsyncBufferSequencePolicy.Policy) {
      switch policy { ... }
    }
    
    public mutating func next() async re throws -> Element? {
        switch self.backing {
             case .bounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()

             case .unbounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()
        }
    }

Importantly here, the Task creation goes into the individual storage implementation since they need to do different things.

Let me know what you think! Happy to discuss this

BTW @FranzBusch I'm still working on it :-) I've not abandoned ... just need to find some time.

@twittemb
Copy link
Contributor Author

This looks very good already. I think the public API is almost perfect.I left some more comments inline but I have one larger one. You merged the implementation of the bounded and unbounded buffer into a single state machine now. This results in a bit of friction since the unbounded cases can constantly request new elements from the upstream whereas the bounded case needs to suspend. With your current implementation the bounded case will request one too many element from the upstream since we suspend after calling next() on the upstream. We should correct that and only request as many elements as actually required.
I know we are going in circles a bit here but I really think we need the two separate Storage and StateMachines that you had previously. I would call them AsyncBoundedBufferStorage/StateMachine and AsyncUnboundedBufferStorage/StateMachine. To still have this code performant I would recommend to change the iterator like this

  public struct Iterator: AsyncIteratorProtocol {
    private enum Backing {
      case bounded(AsyncBoundedBufferStorage)
      case unbounded(AsyncUnboundedBufferStorage)
    }
    
    private let backing: Backing

    init(_ policy: AsyncBufferSequencePolicy.Policy) {
      switch policy { ... }
    }
    
    public mutating func next() async re throws -> Element? {
        switch self.backing {
             case .bounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()

             case .unbounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()
        }
    }

Importantly here, the Task creation goes into the individual storage implementation since they need to do different things.
Let me know what you think! Happy to discuss this

BTW @FranzBusch I'm still working on it :-) I've not abandoned ... just need to find some time.

@FranzBusch I've addressed all the comments so far, except the one on the .modifying state as I noticed an impact on the perfs.

That being said, I have an issue I can't resolve. From time to time the cancellation with the .unbounded policy is not applied.

You can test the throughput for this policy and from time to time it will fail because of an unwaited "finished" expectation. I've noticed the same behaviour with the test test_given_a_buffered_with_unbounded_sequence_when_cancelling_consumer_then_the_iteration_finishes_and_the_base_is_cancelled (it will happen if you run it repeatedly). It's like the system is so busy producing/consuming elements and copying the buffer that the cancellation is never catched. It does not happen with the other policies like bufferingLatest and bufferingOldest.

Honestly I can't figure out why ... I need your help on this.

Thanks.

@twittemb twittemb requested a review from FranzBusch January 27, 2023 13:33
@twittemb
Copy link
Contributor Author

twittemb commented Jan 28, 2023

This looks very good already. I think the public API is almost perfect.I left some more comments inline but I have one larger one. You merged the implementation of the bounded and unbounded buffer into a single state machine now. This results in a bit of friction since the unbounded cases can constantly request new elements from the upstream whereas the bounded case needs to suspend. With your current implementation the bounded case will request one too many element from the upstream since we suspend after calling next() on the upstream. We should correct that and only request as many elements as actually required.
I know we are going in circles a bit here but I really think we need the two separate Storage and StateMachines that you had previously. I would call them AsyncBoundedBufferStorage/StateMachine and AsyncUnboundedBufferStorage/StateMachine. To still have this code performant I would recommend to change the iterator like this

  public struct Iterator: AsyncIteratorProtocol {
    private enum Backing {
      case bounded(AsyncBoundedBufferStorage)
      case unbounded(AsyncUnboundedBufferStorage)
    }
    
    private let backing: Backing

    init(_ policy: AsyncBufferSequencePolicy.Policy) {
      switch policy { ... }
    }
    
    public mutating func next() async re throws -> Element? {
        switch self.backing {
             case .bounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()

             case .unbounded(let storage): 
               let result = await storage.next()
               return result.rethrow_get()
        }
    }

Importantly here, the Task creation goes into the individual storage implementation since they need to do different things.
Let me know what you think! Happy to discuss this

BTW @FranzBusch I'm still working on it :-) I've not abandoned ... just need to find some time.

@FranzBusch I've addressed all the comments so far, except the one on the .modifying state as I noticed an impact on the perfs.

That being said, I have an issue I can't resolve. From time to time the cancellation with the .unbounded policy is not applied.

You can test the throughput for this policy and from time to time it will fail because of an unwaited "finished" expectation. I've noticed the same behaviour with the test test_given_a_buffered_with_unbounded_sequence_when_cancelling_consumer_then_the_iteration_finishes_and_the_base_is_cancelled (it will happen if you run it repeatedly). It's like the system is so busy producing/consuming elements and copying the buffer that the cancellation is never catched. It does not happen with the other policies like bufferingLatest and bufferingOldest.

Honestly I can't figure out why ... I need your help on this.

Thanks.

@FranzBusch
Forget about it ... I found what was wrong. my cancellation management was not 100% good. it is fixed :-)

@twittemb
Copy link
Contributor Author

@phausler i guess you'd like to take a look at the new implementation as well since you approved a previous version.

Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks really great! Thanks for all the work again. I left two last comments and afterwards we should be good to merge this.

/// - Parameter policy: A policy that drives the behaviour of the ``AsyncBufferSequence``
/// - Returns: An asynchronous sequence that buffers elements up to a given limit.
public func buffer(
policy: AsyncBufferSequence<Self>.Policy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if this is the right way to do this. Didn't we last discuss to move the Policy into its standalone top level type called AsyncBufferSequencePolicy?. In the case where you just get a sequence and want to call .buffer(policy: .unlimited) this works great, but if you want to spell hold the policy somewhere. It is getting hard to spell it out. I am inclined to say we should love it to its own top level type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the policy system is perhaps something we need for other things too. I agree that a buffering type being coupled with its policy seems appropriate (specifically the way it is currently written). But, if the policy system is general to any buffering mechanism then it should be at the top level.

Copy link
Contributor Author

@twittemb twittemb Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved the policy to a top level declaration, please advise.

/// A policy for buffering elements until the limit is reached.
/// Then consumption of the upstream `AsyncSequence` will be paused until elements are consumed from the buffer.
public static func bounded(_ limit: Int) -> Self {
precondition(limit > 0, "The limit should be positive for the buffer operator to be efficient.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if limit is 0? There are scenarios where that could be useful.

Copy link
Contributor Author

@twittemb twittemb Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. Before taking any actions I think we have to think about that.

The current behaviour will be:

  • bounded policy: after the first call to next, the consumer will receive the first element and then the producer will suspend until the consumer requests a next element. In the end it's like the buffer iterator has no effect here.

  • bufferingNewest: after the first call to next, the consumer will receive the first element and then the producer will continue to emit elements. The buffer will stack only the latest emitted element (the previous one is discarded). When a new call to next is done then the penultimate element is consumed.

  • bufferingOldest: after the first call to next, the consumer will receive the first element and then the producer will continue to emit elements. The buffer will never stack elements. We loose all the elements until the consumer request a next element.

I guess this is not ideal for the two last cases. Perhaps we should also suspend the producer in those cases?

What do you think @FranzBusch @phausler ?

Copy link
Contributor Author

@twittemb twittemb Feb 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phausler @FranzBusch

I've tried something. Considering that a limit <= 0 should be equivalent to having no buffer, I've introduced a third storageType in the enum. This new case is transparent and just forwards the call to next to the upstream async sequence, as if there was no buffer at all.

I've added some unit tests to make sure the upstream sequence timeline is not altered. I've also added comments in the doc and removed the preconditions.

Let me know what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think handling 0 with no buffer is fine; however, I still think we should preconditionFailure on <0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@twittemb
Copy link
Contributor Author

twittemb commented Feb 1, 2023

@FranzBusch @phausler thanks for the review, I’ll address the comments tomorrow.

@twittemb twittemb requested a review from FranzBusch February 2, 2023 16:36
@twittemb twittemb force-pushed the feature/buffer branch 2 times, most recently from d64bea8 to 4d346a4 Compare February 6, 2023 12:22
@twittemb twittemb requested review from phausler and FranzBusch and removed request for FranzBusch and phausler February 6, 2023 12:23
Copy link
Member

@FranzBusch FranzBusch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the work. Looks great! One last nit but approved already

Sources/AsyncAlgorithms/Buffer/AsyncBufferSequence.swift Outdated Show resolved Hide resolved
@twittemb
Copy link
Contributor Author

twittemb commented Feb 8, 2023

Thanks for all the work. Looks great! One last nit but approved already

@phausler is it OK with you ?

@phausler
Copy link
Member

yea, this looks great! lets merge it

@phausler phausler merged commit 9cfed92 into apple:main Feb 10, 2023
@tcldr
Copy link

tcldr commented Feb 10, 2023

Nice one, @twittemb . Looks great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants