Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have written code that allows me to start fetching the next chunk of data from an API while the previous chunk of data is being processed.

I'd like this to be always fetching up to 5 chunks concurrently at any given moment, but the returned data should always be processed in the correct order even if a request that is last in the queue completes before any other.

How can my code be changed to make this happen?

class MyClient:
    async def fetch_entities(
        self,
        entity_ids:List[int],
        objects:Optional[List[str]],
        select_inbound:Optional[List[str]]=None,
        select_outbound:Optional[List[str]]=None,
        queue_size:int=5,
        chunk_size:int=500,
    ):
        """
        Fetch entities in chunks

        While one chunk of data is being processed the next one can
        already be fetched. In other words: Data processing does not
        block data fetching.
        """
        objects = ",".join(objects)
        if select_inbound:
            select_inbound = ",".join(select_inbound)

        if select_outbound:
            select_outbound = ",".join(select_outbound)

        queue = asyncio.Queue(maxsize=queue_size)

        # TODO: I want to be able to fill the queue with requests that are already executing

        async def queued_chunks():
            for ids in chunks(entity_ids, chunk_size):
                res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                    "entityIds": ids,
                    "objects": objects,
                    "inbound": {
                        "linkTypeIds": select_outbound,
                        "objects": objects,
                    } if select_inbound else {},
                    "outbound": {
                        "linkTypeIds": select_inbound,
                        "objects": objects,
                    } if select_outbound else {},
                })
                await queue.put(res)
            await queue.put(None)

        asyncio.create_task(queued_chunks())

        while True:
            res = await queue.get()
            if res is None:
                break
            res.raise_for_status()
            queue.task_done()
            for entity in res.json():
                yield entity

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
136 views
Welcome To Ask or Share your Answers For Others

1 Answer

I’d use two queues here: one with the chunks to process, and one for the chunks that are complete. You can have any number of worker tasks to process chunks, and you can put a size limit on the first queue to limit how many chunks you prefetch. Use just a single loop to receive the processed chunks, to ensure they are kept ordered (your code already does this).

The trick is to put futures into both queues, one for every chunk to be processed. The worker tasks that do the processing fetch a chunk and future pair, and then need to resolve the associated future by setting the POST response as the result of these futures. The loop that handles the processed chunks awaits on each future and so will only proceed to the next chunk when the current chunk has been fully processed. For this to work you need to put both the chunk and the corresponding future into the first queue for the workers to process. Put the same future into the second queue; these enforce the chunk results are processed in order.

So, in summary:

  • Have two queues:
    1. chunks holds (chunk, future) objects.
    2. completed holds futures, *the same futures paired with chunks in the other queue.
  • Create “worker” tasks that consume from the chunks queue. If you create 5, then 5 chunks will be processed in parallel. Every time a worker has competed processing, they set the result on the corresponding future.
  • use a “processed chunks” loop; it takes the next future from the completed queue and awaits on it. Only when the specific chunk associated with that future has been competed will it produce the result (set by a worker task).

As a rough sketch, it’d look something like this:

chunk_queue = asyncio.Queue()
completed_queue = asyncio.Queue()
WORKER_COUNT = queue_size

async def queued_chunks():
    for ids in chunks(entity_ids, chunk_size):
        future = asyncio.Future()
        await chunk_queue.put((ids, future))
        await completed_queue.put(future)
    completed_queue.put(None)

async def worker():
    while True:
        ids, future = chunk_queue.get()
        try:
            res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
                "entityIds": ids,
                "objects": objects,
                "inbound": {
                    "linkTypeIds": select_outbound,
                    "objects": objects,
                } if select_inbound else {},
                "outbound": {
                    "linkTypeIds": select_inbound,
                    "objects": objects,
                } if select_outbound else {},
            })
            res.raise_for_status()
            future.set_result(res)
        except Exception as e:
            future.set_exception(e)
            return

workers = [asyncio.create_task(worker) for _ in range(WORKER_COUNT)]
chunk_producer = asyncio.create_task(queued_chunks())

try:
    while True:
        future = await completed_queue.get()
        if future is None:
            # all chunks have been processed!
            break
        res = await future
        yield from res.json()

finally:
    for w in workers:
        w.cancel()
    asyncio.wait(workers)

If you must limit how many chunks are queued (and not just how many are being processed concurrently), set maxsize on the chunk_queue queue (to a value greater than WORKER_COUNT). Use this to limit memory requirements, for example.

However, if you were to set the maxsize to a value equal to WORKER_COUNT, you may as well get rid of the worker tasks altogether and instead put the body of the worker loop as coroutines wrapped in tasks into the completed results queue. The asyncio Task class is a subclass of Future, which automatically sets the future result when the coroutine it wraps completes. If you are not going to put in more chunks into the chunk_queue than you have worker tasks you may as well cut out the middleman, drop the chunk_queue altogether. The tasks then go into the completed queue instead of plain futures:

completed_queue = asyncio.Queue(maxsize=queue_size)

async def queued_chunks():
    for ids in chunks(entity_ids, chunk_size):
        task = asyncio.create_task(fetch_task(ids))
        await completed_queue.put(task)
    completed_queue.put(None)

async def fetch_task(ids):
    res = await self.client.post(urllib.parse.quote("entities:fetchdata"),
        json={
            "entityIds": ids,
            "objects": objects,
            "inbound": {
                "linkTypeIds": select_outbound,
                "objects": objects,
            } if select_inbound else {},
            "outbound": {
                "linkTypeIds": select_inbound,
                "objects": objects,
            } if select_outbound else {},
        }
    )
    res.raise_for_status()
    return res

chunk_producer = asyncio.create_task(queued_chunks())

while True:
    task = await completed_queue.get()
    if task is None:
        # all chunks have been processed!
        break
    res = await task
    yield from task.json()

This version is really close to what you had already, the only difference being that we put the await for the client POST coroutine and the check for the response status code into a separate coroutine to be run as tasks. You could also make the self.client.post() coroutine into the task (so not await on it) and leave checking for the response status to the final queue processing loop. That’s what Pablo’s answer proposed so I won’t repeat that here.

Note that this version starts the task before putting it in the queue. The queue was a not the only limit on the number of active tasks, there is also an already started task waiting for space to be put into the queue on one end (the line await completed_queue.put(task) blocks if the queue is full), and another task already taken out by the queue consumer (fetched by task = await completed_queue.get()). If you need to limit the number of active tasks, subtract 2 from the queue maxsize to set an upper limit.

Also, because tasks could complete in the meantime, there could be maxsize + 1 fewer active tasks, but you can’t start any more until more space has been freed in the queue. Because the first approach queues inputs for tasks, it hasn’t got these issues. You could mitigate this problem by using a semaphore rather than a bound queuesize to limit tasks (acquire a slot before starting a task, and just before returning from a task, releasing a slot).

Personally I’d pick my first proposal as it gives you separate control over concurrency and chunk prefetching, without the issues the second approach has.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...