任务池模型¶
任务池本质上是一个生产者消费者模型,我们用它来限制异步任务的并行度.通常异步的IO任务很容易就达到服务端的连接数上限.为了限制连接数,一个常见的方法是使用连接池,但连接池只会在连接都用完后抛出异常,后续的任务并没有办法得到执行.这个任务池模型在限制了并行度的同时会缓存任务,如果并行度不够它会在队列中等待,直到被消耗执行为止.同时由于限制了并行度,这也就有效的降低了服务端的压力.
任务池的种类¶
当前版本有六种任务池类
固定执行器数量的任务池
AioFixedTaskPoolSimple
AioFixedTaskPoolLifo
AioFixedTaskPoolPriority
执行器数量会自动扩张收缩的任务池
AioAutoScaleTaskPoolSimple
AioAutoScaleTaskPoolLifo
AioAutoScaleTaskPoolPriority
如何使用¶
所有的任务池类型对象都支持python的异步上下文管理器协议.
async def test(name):
print(f"{name} start")
for i in range(5):
await asyncio.sleep(1)
result = f"{name} done"
print(result)
return "ok:"+ result
async def main():
async with AioFixedTaskPoolSimple() as task_pool:
print(f"test pool size {task_pool.size}")
print("test 4 task with pool size 3")
print("test await blocking submit")
r = await task_pool.submit(test, func_args=["e"])
assert r == "ok:e done"
print("test await blocking submit")
print("scale 3")
await task_pool.scale(3)
print(f"test pool size {task_pool.size}")
任务池也可以处理常规的python任务,只不过需要借助`concurrent.futures.Executor`的实例.
def test(name):
print(f"{name} start")
for i in range(5):
time.sleep(1)
result = f"{name} done"
print(result)
return "ok:" + result
async def main():
executor = concurrent.futures.ProcessPoolExecutor()
async with AioFixedTaskPoolSimple(executor=executor) as task_pool:
async with AioFixedTaskPoolSimple() as task_pool:
print(f"test pool size {task_pool.size}")
print("test 4 task with pool size 3")
await asyncio.gather(
task_pool.submit(test, func_args=["c"]),
task_pool.submit(test, func_args=["b"]),
task_pool.submit(test, func_args=["a"]),
task_pool.submit(test, func_args=["d"])
)
当然你也可以使用接口`start`和`close`来管理任务池对象的生命周期.
async def main():
task_pool = AioFixedTaskPoolSimple()
await task_pool.start()
print(f"test pool size {task_pool.size}")
print("test 4 task with pool size 3")
print("test await blocking submit")
r = await task_pool.submit(test, func_args=["e"])
assert r == "ok:e done"
print("test await blocking submit")
print("scale 3")
await task_pool.scale(3)
print(f"test pool size {task_pool.size}")
await task_pool.close()
操作¶
只有3种操作
submit 提交一个任务到任务池
result = await task_pool.submit(test, func_args=["e"])
如果你使用的是优先级任务池,你可以加入一个额外的参数`weight`,这个参数越小就越优先.
result = await priority_task_pool.submit(test, func_args=["e"],weight=1)
scale 伸缩任务池的并行度.可以接受一个负数作为参数,意味降低并行度
await task_pool.scale(-1)
pause 任务池暂停/取消暂停提交任务.我们也可以使用`paused`来查看是否是暂停状态.
await task_pool.pause() print(task_pool.paused)