aio_parallel_tools.aio_task_pool package

使用后进先出队列的自扩展异步任务池

异步任务池类

class aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_lifo.AioAutoScaleTaskPoolLifo(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[源代码]

基类:aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin.LifoQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Auto Scale Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop): 任务执行所在的事件循环.

size (int): 当前并行度.

closed (bool): 检查任务池是否已经被关闭了.

paused (bool): 检查任务池是否已经被暂停.

waiting_tasks_number (int): 还在等待执行的任务数目.

max_tasks_number (int): 最大可容纳的等待执行的任务数目.

Method:

pause (function): 暂停任务池,不再接收任务.

scale_nowait (function): 同步的伸缩任务池的并行度.注意减小并不是立即生效

submit_nowait (function): 同步的提交任务到任务池,注意并不都会阻塞地等待返回结果,而是会返回一个期物对象.

Asynchronous Method:

start (function): 初始化并启动任务池

close (function): 关闭任务池.

scale (function): 伸缩任务池.

submit (function): 提交任务到任务池

例子:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioAutoScaleTaskPoolLifo() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async close(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[源代码]

关闭所有执行器并暂停任务提交.

参数
  • close_worker_timeout (Union[int, float, None], optional) -- 关闭所有worker的过期时间,也会等待任务全部执行完,默认为`None`

  • close_pool_timeout (int, optional) -- 等待队列同步的过期时间,默认3s

  • safe (bool, optional) -- 当关闭抛出异常时抛出警告替代,默认为`True`

引发
  • te -- 关闭执行器超时

  • e -- 关闭执行器时抛出未知错误.

  • te -- 等待队列同步超时.

  • e -- 等待队列同步时抛出未知异常.

使用优先级队列的异步自动扩展任务池

异步任务池类

class aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_priority.AioAutoScaleTaskPoolPriority(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[源代码]

基类:aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.priorityq_mixin.PriorityQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Auto Scale Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop): 任务执行所在的事件循环.

size (int): 当前并行度.

closed (bool): 检查任务池是否已经被关闭了.

paused (bool): 检查任务池是否已经被暂停.

waiting_tasks_number (int): 还在等待执行的任务数目.

max_tasks_number (int): 最大可容纳的等待执行的任务数目.

Method:

pause (function): 暂停任务池,不再接收任务.

scale_nowait (function): 同步的伸缩任务池的并行度.注意减小并不是立即生效

submit_nowait (function): 同步的提交任务到任务池,注意并不都会阻塞地等待返回结果,而是会返回一个期物对象.

Asynchronous Method:

start (function): 初始化并启动任务池

close (function): 关闭任务池.

scale (function): 伸缩任务池.

submit (function): 提交任务到任务池

例子:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioAutoScaleTaskPoolPriority() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async close(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[源代码]

关闭所有执行器并暂停任务提交.

参数
  • close_worker_timeout (Union[int, float, None], optional) -- 关闭所有worker的过期时间,也会等待任务全部执行完,默认为`None`

  • close_pool_timeout (int, optional) -- 等待队列同步的过期时间,默认3s

  • safe (bool, optional) -- 当关闭抛出异常时抛出警告替代,默认为`True`

引发
  • te -- 关闭执行器超时

  • e -- 关闭执行器时抛出未知错误.

  • te -- 等待队列同步超时.

  • e -- 等待队列同步时抛出未知异常.

使用默认队列的异步自扩展任务池

异步任务池类

class aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_simple.AioAutoScaleTaskPoolSimple(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[源代码]

基类:aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin.SimpleQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Auto Scale Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop): 任务执行所在的事件循环.

size (int): 当前并行度.

closed (bool): 检查任务池是否已经被关闭了.

paused (bool): 检查任务池是否已经被暂停.

waiting_tasks_number (int): 还在等待执行的任务数目.

max_tasks_number (int): 最大可容纳的等待执行的任务数目.

Method:

pause (function): 暂停任务池,不再接收任务.

scale_nowait (function): 同步的伸缩任务池的并行度.注意减小并不是立即生效

submit_nowait (function): 同步的提交任务到任务池,注意并不都会阻塞地等待返回结果,而是会返回一个期物对象.

Asynchronous Method:

start (function): 初始化并启动任务池

close (function): 关闭任务池.

scale (function): 伸缩任务池.

submit (function): 提交任务到任务池

例子:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioAutoScaleTaskPoolSimple() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async close(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[源代码]

关闭所有执行器并暂停任务提交.

参数
  • close_worker_timeout (Union[int, float, None], optional) -- 关闭所有worker的过期时间,也会等待任务全部执行完,默认为`None`

  • close_pool_timeout (int, optional) -- 等待队列同步的过期时间,默认3s

  • safe (bool, optional) -- 当关闭抛出异常时抛出警告替代,默认为`True`

引发
  • te -- 关闭执行器超时

  • e -- 关闭执行器时抛出未知错误.

  • te -- 等待队列同步超时.

  • e -- 等待队列同步时抛出未知异常.

使用后进先出队列的固定异步任务池

异步任务池类

class aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_lifo.AioFixedTaskPoolLifo(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[源代码]

基类:aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin.LifoQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Asynchronous Task Pool Class with lifo queue.

this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop): 任务执行所在的事件循环.

size (int): 当前并行度.

closed (bool): 检查任务池是否已经被关闭了.

paused (bool): 检查任务池是否已经被暂停.

waiting_tasks_number (int): 还在等待执行的任务数目.

max_tasks_number (int): 最大可容纳的等待执行的任务数目.

Method:

pause (function): 暂停任务池,不再接收任务.

scale_nowait (function): 同步的伸缩任务池的并行度.注意减小并不是立即生效

submit_nowait (function): 同步的提交任务到任务池,注意并不都会阻塞地等待返回结果,而是会返回一个期物对象.

Asynchronous Method:

start (function): 初始化并启动任务池

close (function): 关闭任务池.

scale (function): 伸缩任务池.

submit (function): 提交任务到任务池

例子:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioFixedTaskPoolLifo() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")

使用优先级队列的固定异步任务池

异步任务池类

class aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_priority.AioFixedTaskPoolPriority(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[源代码]

基类:aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.priorityq_mixin.PriorityQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

异步任务池类

this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop): 任务执行所在的事件循环.

size (int): 当前并行度.

closed (bool): 检查任务池是否已经被关闭了.

paused (bool): 检查任务池是否已经被暂停.

waiting_tasks_number (int): 还在等待执行的任务数目.

max_tasks_number (int): 最大可容纳的等待执行的任务数目.

Method:

pause (function): 暂停任务池,不再接收任务.

scale_nowait (function): 同步的伸缩任务池的并行度.注意减小并不是立即生效

submit_nowait (function): 同步的提交任务到任务池,注意并不都会阻塞地等待返回结果,而是会返回一个期物对象.

Asynchronous Method:

start (function): 初始化并启动任务池

close (function): 关闭任务池.

scale (function): 伸缩任务池.

submit (function): 提交任务到任务池

例子:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioFixedTaskPoolPriority() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"],weight=3)
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async submit(task_func: Callable[[Any], Any], *, func_args: List[Any] = [], func_kwargs: Dict[str, Any] = {}, weight: int = 4, blocking: bool = True) → Union[_asyncio.Future, Any][源代码]

提交任务到任务池

参数
  • task_func (Callable[[Any], Any]) -- 将被执行器执行的任务函数

  • func_args (List[Any], optional) -- 任务函数的位置参数.默认为[]

  • func_kwargs (Dict[str, Any], optional) -- 任务函数的关键字参数.默认为{}

  • weight (int) -- 任务的优先级,默认为4

  • blocking (bool, optional) -- 设置是否要等待任务完成获取结果,否的话返回一个期物对象.默认为`True`

引发

NotAvailable -- 任务池被暂停了

返回

如果`blocking`被设置为`True`,那么将返回任务的结果,否则返回一个可以`await`任务结果的期物对象.

返回类型

Union[asyncio.Future, Any]

submit_nowait(task_func: Callable[[Any], Any], *, func_args: List[Any] = [], func_kwargs: Dict[str, Any] = {}, weight: int = 4) → _asyncio.Future[源代码]

同步的提交任务到任务池

参数
  • task_func (Callable[[Any], Any]) -- 将被执行器执行的任务函数

  • func_args (List[Any], optional) -- 任务函数的位置参数.默认为[]

  • func_kwargs (Dict[str, Any], optional) -- 任务函数的关键字参数.默认为{}

  • weight (int) -- 任务的优先级,默认为4

引发
  • NotAvailable -- 任务池被暂停了或者

  • e -- 抛出了别的异常

  • NotAvailable -- 任务池队列满了,无法再接收更多任务.

返回

一个可以被`await`获得任务结果的期物对象.

返回类型

asyncio.Future

使用默认队列的固定异步任务池

异步任务池类

class aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_simple.AioFixedTaskPoolSimple(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[源代码]

基类:aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin.SimpleQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Simple Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop): 任务执行所在的事件循环.

size (int): 当前并行度.

closed (bool): 检查任务池是否已经被关闭了.

paused (bool): 检查任务池是否已经被暂停.

waiting_tasks_number (int): 还在等待执行的任务数目.

max_tasks_number (int): 最大可容纳的等待执行的任务数目.

Method:

pause (function): 暂停任务池,不再接收任务.

scale_nowait (function): 同步的伸缩任务池的并行度.注意减小并不是立即生效

submit_nowait (function): 同步的提交任务到任务池,注意并不都会阻塞地等待返回结果,而是会返回一个期物对象.

Asynchronous Method:

start (function): 初始化并启动任务池

close (function): 关闭任务池.

scale (function): 伸缩任务池.

submit (function): 提交任务到任务池

例子:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioFixedTaskPoolSimple() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")