Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get N items from Channel #562

Open
dstufft opened this issue Jul 20, 2018 · 7 comments
Open

Get N items from Channel #562

dstufft opened this issue Jul 20, 2018 · 7 comments

Comments

@dstufft
Copy link

dstufft commented Jul 20, 2018

It would be super useful to be able to get multiple items from a trio.Queue object at once, ideally with a few options to tune if you want to get exactly N items (and block until N items happen), or if you're happy with less than that (and if you block, something with timeouts to be able to get what you've gotten so far).

@njsmith
Copy link
Member

njsmith commented Jul 20, 2018

Hey Donald! It's certainly doable, but... could you back up and give some context on what you're trying to do and how you concluded that this will help do it?

@dstufft
Copy link
Author

dstufft commented Jul 20, 2018

So I'm basically processing events that are being fed from one system, doing some processing on them, then sending them off to another system. The problem is when I send them to this other system, I need to send them in batches (because the number of incoming invents is a reasonably significant number, and this other system doesn't want that many single API calls, it wants them batched up).

So I have one task that is accepting incoming events, processing them, and then throwing them into a trio.Queue, and then other tasks are pulling items off of that queue, until it hits the batch size, and then triggers the other API call.

I can do something like:

async def sender(q):
    to_send = []
    while True:
        with trio.move_on_after(30):
            to_send.append(await q.get())
            if len(to_send) < 10000:
                continue

        # TODO: Do the actual sending logic
        to_send = []

That works, though it introduces a second queue (but at least this second queue is bounded still), but the big problem with this, is if you have multiple instances of sender running at once, all acting on the same trio.Queue, then you end up buffering up to N per task, instead of just up to N.

Another option to get around it is to instead of having the sending tasks directly processing the trio.Queue, have a single collector pulling single items off of the queue, and locally buffering them until it gets to the maximum size of a batch it wants (or enough time passes) and then sending the whole batch into a second trio.Queue. That also works, but I think it would be a lot nicer if this could be baked into trio itself, to enable something like:

async def sender(q):
    while True:
        with trio.move_on_after(30):
            to_send = await q.get_batch(10000)
            # TODO: Do the actual sending logic.

I think that provides a much cleaner API than the other options provide.

@smurfix
Copy link
Contributor

smurfix commented Jul 20, 2018

You can solve the problem easily if you move the batching logic to the put side of things.

class BatchedQueue:
    def __init__(self, batch_size, queue_size):
        self._size = batch_size
        self._queue = trio.Queue(queue_size)
        self._batch = []

    async def put(self, item):
        self._batch.append(item)
        if len(self._batch) < self._size:
            await _core.cancel_shielded_checkpoint()
            return
        await self.flush()

    async def flush(self):
        if not self._batch:
            return
        batch, self._batch = self._batch, []
        await self._q.put(batch)

    async def get(self):
        return await self._q.get()

Adding a task that queues partial batches after a timeout is left as an exercise to the reader. ;-)

@njsmith
Copy link
Member

njsmith commented Jul 21, 2018

@dstufft Okay, so... let's see if I have this right. You have a bunch of receivers who take in events from some external sources (a bunch of TCP connections). You want to batch up these events, and then once every X seconds or Y events – whichever comes first – you want to submit a whole batch to some other service.

The simplest way to do this would be:

async def sender(q):
    while True:
        batch = []
        # Note that we to put a limit on the whole batch collection process,
        # so we put the loop inside move_on_after
        with move_on_after(X):
            while len(batch) < Y:
                batch.append(await q.get())

        await send_batch(batch)

You also asked about what happens if you have multiple senders running. One way would be to use a second queue, like you noticed. Another option that you might or might not find simpler would be to have a trio.Lock:

async def sender(q, sender_lock):
    while True:
        async with sender_lock:
            batch = []
            with move_on_after(X):
                while len(batch) < Y:
                    batch.append(await q.get())

            await send_batch(batch)

So only one sender can "take charge" of collecting + sending a batch at a time.

But let's back up a moment. I'm assuming the reason you want multiple senders is because you want to increase robustness. I think there are two things we would want to watch out for: something going wonky with the send_batch call (e.g., the outgoing request stalling out), or the receivers piling up data faster than the senders can forward it onwards (since from what I understand of your actual problem, you don't have any way to apply backpressure to the upstream event source).

Having multiple senders running could help if send_batch stalls out... but if it's being really flaky, and you have a fixed number of senders, then eventually they'll all get stalled, which defeats the purpose.

So maybe you want something like:

# A robust version of send_batch: we can kick this off, and then it will take care
# of itself, including making sure not to stall out forever
async def send_batch(batch):
    # let's do some retries in case the first try fails
    for _ in range(3):
        try:
            # And if an attempt stalls, treat that as a failure and go to the retry path
            with fail_after(10):
                ...
        except ...:
            ...
        else:
            # success!
            return
    # ...I guess all the tries failed. What should we do with the data?
    # log something and drop it on the floor, I guess?

async def sender(q):
    async with open_nursery() as nursery:
        while True:
            batch = []
            with move_on_after(X):
                while len(batch) < Y:
                    batch.append(await q.get())

            nursery.start_soon(send_batch, batch)

So here we might end up with several calls to send_batch running at the same time, but so long as your data only arrives at N items/second, then we'll only fill up a (N/Y) times/second, and send_batch as written above can only run for a max of 310 = 30 seconds, so we'll never have more than 30N/Y simultaneous calls to send_batch running at a time – so as long as we choose Y appropriately we shouldn't be in any danger of running away and eating all the memory or anything.

Given that the actual goal of this batching is to avoid overloading the service you're ultimately sending it to, I wonder if it would be better to drop the maximum batch size limit entirely, and just say "we'll send out a request every X seconds, with whatever we've gathered". That slightly increases how much buffering you're doing, but presumably you can hold, like, 1 or 5 or 10 seconds worth of data in memory, and losing 1 or 5 or 10 seconds of data to a server crash isn't appreciably worse than losing 0.5 seconds or whatever it would be with the len(batch) limit.

If you do it like that, then you can make this even simple -- you don't even need a Queue, you can just use a plain list, like:

async def receiver(data_to_send):
    while True:
        data_to_send.append(await get_another_item())

async def sender(data_to_send):
    async with open_nursery() as nursery:
        while True:
            await sleep(X)
            batch = data_to_send.copy()
            data_to_send.clear()
            nursery.start_soon(send_batch, batch)

async def main():
    async with open_nursery() as nursery:
        data_to_send = []
        nursery.start_soon(receiver, data_to_send)
        nursery.start_soon(sender, data_to_send)

What do you think?

(And returning to the original topic of this issue: I don't think any of my suggestions here would actually benefit from a Queue.get_batch method?)

@dstufft
Copy link
Author

dstufft commented Jul 21, 2018

Well, the Y in my particular case is more or less chosen for me (the recommended batch size is no larger than 500, but a maximum of 10,000), so the final option doesn't work for me, but the one before it will.

The primary benefit I saw in the Queue.get_batch method, is that keeps all of the decisions about bounding the buffered items in one place, the Queue instance. Splitting it out as above (which is what I'm doing right now), means that I now have three locations to tune in order to control how much buffering my application has:

  • The buffering that the Queue itself is doing.
  • The number of sender() instances (since each one of those has it's own instance of a batch queue/list).
  • The size of the batch queue/list.

Compare this to something like trio.Queue(10000, batched=500) where that is all controlled in a single place (store no more than 10,000 items, with a batch size of 500). Though maybe it'd be better spelled be trio.BatchedQueue() or something.

Ultimately though, it's the same sort of thing (I think) that caused trio to have cancel scopes instead of individual timeouts to particular function calls, If you want to buffer at most N items, you have to decide how you're going to split that across all of the various queues/lists just like with timeouts you'd have to decide how to divvy up your "timeout budget" across multiple calls.

@njsmith
Copy link
Member

njsmith commented Jul 21, 2018

@dstufft Ah, you're totally right of course, I was getting thrown by my own forays into batching-up-logs-to-submit-to-long-term-storage, where for the service I was targeting the main limit was requests/second, not items/request. So my intuition was wrong. But yeah, I was curious so I looked at the BigQuery docs (cough assuming hypothetically that this unnamed project might want to submit things to BigQuery), and I see the same thing you do: they don't care at all about how many insert requests/second you do, as long as each one isn't too big.

Do you want some kind of "if we haven't put together a full batch after X seconds, then go ahead and submit a partial batch" logic? I'm trying to figure out what exactly the Queue.get_batch semantics would be, and the partial batch case seems tricky.

@oremanj oremanj changed the title Get N items from Queue Get N items from Channel May 1, 2019
@oremanj
Copy link
Member

oremanj commented May 1, 2019

Updating the name to reflect the change to use Channels. Not sure if this is still feasible to think about adding; keeping it open for now as a marker to think about it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants