任务池模型

任务池本质上是一个生产者消费者模型,我们用它来限制异步任务的并行度.通常异步的IO任务很容易就达到服务端的连接数上限.为了限制连接数,一个常见的方法是使用连接池,但连接池只会在连接都用完后抛出异常,后续的任务并没有办法得到执行.这个任务池模型在限制了并行度的同时会缓存任务,如果并行度不够它会在队列中等待,直到被消耗执行为止.同时由于限制了并行度,这也就有效的降低了服务端的压力.

任务池的种类

当前版本有六种任务池类

  • 固定执行器数量的任务池

    • AioFixedTaskPoolSimple

    • AioFixedTaskPoolLifo

    • AioFixedTaskPoolPriority

  • 执行器数量会自动扩张收缩的任务池

    • AioAutoScaleTaskPoolSimple

    • AioAutoScaleTaskPoolLifo

    • AioAutoScaleTaskPoolPriority

如何使用

所有的任务池类型对象都支持python的异步上下文管理器协议.

async def test(name):
    print(f"{name} start")
    for i in range(5):
        await asyncio.sleep(1)
    result = f"{name} done"
    print(result)
    return "ok:"+ result
async def main():
    async with AioFixedTaskPoolSimple() as task_pool:
        print(f"test pool size {task_pool.size}")
        print("test 4 task with pool size 3")
        print("test await blocking submit")
        r = await task_pool.submit(test, func_args=["e"])
        assert r == "ok:e done"
        print("test await blocking submit")
        print("scale 3")
        await task_pool.scale(3)
        print(f"test pool size {task_pool.size}")

任务池也可以处理常规的python任务,只不过需要借助`concurrent.futures.Executor`的实例.

def test(name):
    print(f"{name} start")
    for i in range(5):
        time.sleep(1)
    result = f"{name} done"
    print(result)
    return "ok:" + result
async def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    async with AioFixedTaskPoolSimple(executor=executor) as task_pool:
        async with AioFixedTaskPoolSimple() as task_pool:
        print(f"test pool size {task_pool.size}")
        print("test 4 task with pool size 3")
        await asyncio.gather(
            task_pool.submit(test, func_args=["c"]),
            task_pool.submit(test, func_args=["b"]),
            task_pool.submit(test, func_args=["a"]),
            task_pool.submit(test, func_args=["d"])
        )

当然你也可以使用接口`start`和`close`来管理任务池对象的生命周期.

async def main():
    task_pool = AioFixedTaskPoolSimple()
    await task_pool.start()
    print(f"test pool size {task_pool.size}")
    print("test 4 task with pool size 3")
    print("test await blocking submit")
    r = await task_pool.submit(test, func_args=["e"])
    assert r == "ok:e done"
    print("test await blocking submit")
    print("scale 3")
    await task_pool.scale(3)
    print(f"test pool size {task_pool.size}")
    await task_pool.close()

操作

只有3种操作

  • submit 提交一个任务到任务池

    result = await task_pool.submit(test, func_args=["e"])
    

    如果你使用的是优先级任务池,你可以加入一个额外的参数`weight`,这个参数越小就越优先.

    result = await priority_task_pool.submit(test, func_args=["e"],weight=1)
    
  • scale 伸缩任务池的并行度.可以接受一个负数作为参数,意味降低并行度

    await task_pool.scale(-1)
    
  • pause 任务池暂停/取消暂停提交任务.我们也可以使用`paused`来查看是否是暂停状态.

    await task_pool.pause()
    print(task_pool.paused)