Task Pool¶
We use task pool to limit the parallelism of asynchronous tasks. Generally, asynchronous io tasks is easy to hit a bottleneck of server. And connection pool usualy can only tell you that connection pool is full. So we need a task pool to limit the parallelism of asynchronous tasks so that server’s io limit will not so easy to reach.
Kinds of task pool¶
Now there are 6 kinds of task pool:
worker number fixed task pool
AioFixedTaskPoolSimpleAioFixedTaskPoolLifoAioFixedTaskPoolPriority
worker number auto scale task pool
AioAutoScaleTaskPoolSimpleAioAutoScaleTaskPoolLifoAioAutoScaleTaskPoolPriority
How to use¶
All of them can use as asynchronous context manager.
async def test(name):
print(f"{name} start")
for i in range(5):
await asyncio.sleep(1)
result = f"{name} done"
print(result)
return "ok:"+ result
async def main():
async with AioFixedTaskPoolSimple() as task_pool:
print(f"test pool size {task_pool.size}")
print("test 4 task with pool size 3")
print("test await blocking submit")
r = await task_pool.submit(test, func_args=["e"])
assert r == "ok:e done"
print("test await blocking submit")
print("scale 3")
await task_pool.scale(3)
print(f"test pool size {task_pool.size}")
Task pools can also deal with a normal function, but it will be run in a concurrent.futures.Executor
def test(name):
print(f"{name} start")
for i in range(5):
time.sleep(1)
result = f"{name} done"
print(result)
return "ok:" + result
async def main():
executor = concurrent.futures.ProcessPoolExecutor()
async with AioFixedTaskPoolSimple(executor=executor) as task_pool:
async with AioFixedTaskPoolSimple() as task_pool:
print(f"test pool size {task_pool.size}")
print("test 4 task with pool size 3")
await asyncio.gather(
task_pool.submit(test, func_args=["c"]),
task_pool.submit(test, func_args=["b"]),
task_pool.submit(test, func_args=["a"]),
task_pool.submit(test, func_args=["d"])
)
Of course you can just use start and close to manage the task pool’s Life Cycle.
async def main():
task_pool = AioFixedTaskPoolSimple()
await task_pool.start()
print(f"test pool size {task_pool.size}")
print("test 4 task with pool size 3")
print("test await blocking submit")
r = await task_pool.submit(test, func_args=["e"])
assert r == "ok:e done"
print("test await blocking submit")
print("scale 3")
await task_pool.scale(3)
print(f"test pool size {task_pool.size}")
await task_pool.close()
Operations¶
There are only 3 Operations:
submitSubmit a task to the task pool.result = await task_pool.submit(test, func_args=["e"])
If you use a Priority task pool, you can submit with a int param
weight, the more small the more fast to be execut.result = await priority_task_pool.submit(test, func_args=["e"],weight=1)
scaleScale the worker’s number to increase/decrease the parallelism.number must be a int,if it’s negative,task pool will decrease the parallelism.await task_pool.scale(-1)
pausePause/unpause the task pool to manange if user can submit task.You can usepausedto check if you can submit tasks.await task_pool.pause() print(task_pool.paused)