I’d use two queues here: one with the chunks to process, and one for the chunks that are complete. You can have any number of worker tasks to process chunks, and you can put a size limit on the first queue to limit how many chunks you prefetch. Use just a single loop to receive the processed chunks, to ensure they are kept ordered (your code already does this).
The trick is to put futures into both queues, one for every chunk to be processed. The worker tasks that do the processing fetch a chunk and future pair, and then need to resolve the associated future by setting the POST response as the result of these futures. The loop that handles the processed chunks awaits on each future and so will only proceed to the next chunk when the current chunk has been fully processed. For this to work you need to put both the chunk and the corresponding future into the first queue for the workers to process. Put the same future into the second queue; these enforce the chunk results are processed in order.
So, in summary:
- Have two queues:
chunks
holds (chunk, future)
objects.
completed
holds futures, *the same futures paired with chunks in the other queue.
- Create “worker” tasks that consume from the chunks queue. If you create 5, then 5 chunks will be processed in parallel. Every time a worker has competed processing, they set the result on the corresponding future.
- use a “processed chunks” loop; it takes the next future from the
completed
queue and awaits on it. Only when the specific chunk associated with that future has been competed will it produce the result (set by a worker task).
As a rough sketch, it’d look something like this:
chunk_queue = asyncio.Queue()
completed_queue = asyncio.Queue()
WORKER_COUNT = queue_size
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
future = asyncio.Future()
await chunk_queue.put((ids, future))
await completed_queue.put(future)
completed_queue.put(None)
async def worker():
while True:
ids, future = chunk_queue.get()
try:
res = await self.client.post(urllib.parse.quote("entities:fetchdata"), json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
})
res.raise_for_status()
future.set_result(res)
except Exception as e:
future.set_exception(e)
return
workers = [asyncio.create_task(worker) for _ in range(WORKER_COUNT)]
chunk_producer = asyncio.create_task(queued_chunks())
try:
while True:
future = await completed_queue.get()
if future is None:
# all chunks have been processed!
break
res = await future
yield from res.json()
finally:
for w in workers:
w.cancel()
asyncio.wait(workers)
If you must limit how many chunks are queued (and not just how many are being processed concurrently), set maxsize
on the chunk_queue
queue (to a value greater than WORKER_COUNT
). Use this to limit memory requirements, for example.
However, if you were to set the maxsize
to a value equal to WORKER_COUNT
, you may as well get rid of the worker tasks altogether and instead put the body of the worker loop as coroutines wrapped in tasks into the completed results queue. The asyncio Task
class is a subclass of Future
, which automatically sets the future result when the coroutine it wraps completes. If you are not going to put in more chunks into the chunk_queue
than you have worker tasks you may as well cut out the middleman, drop the chunk_queue
altogether. The tasks then go into the completed queue instead of plain futures:
completed_queue = asyncio.Queue(maxsize=queue_size)
async def queued_chunks():
for ids in chunks(entity_ids, chunk_size):
task = asyncio.create_task(fetch_task(ids))
await completed_queue.put(task)
completed_queue.put(None)
async def fetch_task(ids):
res = await self.client.post(urllib.parse.quote("entities:fetchdata"),
json={
"entityIds": ids,
"objects": objects,
"inbound": {
"linkTypeIds": select_outbound,
"objects": objects,
} if select_inbound else {},
"outbound": {
"linkTypeIds": select_inbound,
"objects": objects,
} if select_outbound else {},
}
)
res.raise_for_status()
return res
chunk_producer = asyncio.create_task(queued_chunks())
while True:
task = await completed_queue.get()
if task is None:
# all chunks have been processed!
break
res = await task
yield from task.json()
This version is really close to what you had already, the only difference being that we put the await for the client POST coroutine and the check for the response status code into a separate coroutine to be run as tasks. You could also make the self.client.post()
coroutine into the task (so not await on it) and leave checking for the response status to the final queue processing loop. That’s what Pablo’s answer proposed so I won’t repeat that here.
Note that this version starts the task before putting it in the queue. The queue was a not the only limit on the number of active tasks, there is also an already started task waiting for space to be put into the queue on one end (the line await completed_queue.put(task)
blocks if the queue is full), and another task already taken out by the queue consumer (fetched by task = await completed_queue.get()
). If you need to limit the number of active tasks, subtract 2 from the queue maxsize to set an upper limit.
Also, because tasks could complete in the meantime, there could be maxsize + 1
fewer active tasks, but you can’t start any more until more space has been freed in the queue. Because the first approach queues inputs for tasks, it hasn’t got these issues. You could mitigate this problem by using a semaphore rather than a bound queuesize to limit tasks (acquire a slot before starting a task, and just before returning from a task, releasing a slot).
Personally I’d pick my first proposal as it gives you separate control over concurrency and chunk prefetching, without the issues the second approach has.