aio_parallel_tools.aio_task_pool package

Async autoscale task pool with lifo queue

Asynchronous Task Pool Class.

class aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_lifo.AioAutoScaleTaskPoolLifo(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]

Bases: aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin.LifoQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Auto Scale Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop):Event loop running on.

size (int): The worker pool’s size.

closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused

paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.

waiting_tasks_number (int): The number of the waiting tasks.

max_tasks_number (int): The maximum number of the waiting tasks.

Method:

pause (function): Pause the task pool.

scale_nowait (function): Scale the number of the task pool’s worker without waiting.

submit_nowait (function): Submit task to the task pool with no wait.

Asynchronous Method:

start (function): Initialize workers and open the task pool to accept tasks.

close (function): Close all workers and paused the task pool.

scale (function): Scale the number of the task pool’s worker.

submit (function): Submit task to the task pool.

Example:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioAutoScaleTaskPoolLifo() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async close(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[source]

Close all workers and paused the task pool.

Parameters
  • close_worker_timeout (Union[int, float, None], optional) – Timeout for closing all workers. Defaults to None.

  • close_pool_timeout (int, optional) – Timeout for join left tasks. Defaults to 3.

  • safe (bool, optional) – when getting exceptions, raise it or warning it. Defaults to True.

Raises
  • te – close workers timeout.

  • e – unknown error when closing workers.

  • te – waiting for left tasks done timeout

  • e – unknown error when waiting for left tasks done

Async autoscale task pool with priority queue

Asynchronous Task Pool Class.

class aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_priority.AioAutoScaleTaskPoolPriority(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]

Bases: aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.priorityq_mixin.PriorityQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Auto Scale Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop):Event loop running on.

size (int): The worker pool’s size.

closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused

paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.

waiting_tasks_number (int): The number of the waiting tasks.

max_tasks_number (int): The maximum number of the waiting tasks.

Method:

pause (function): Pause the task pool.

scale_nowait (function): Scale the number of the task pool’s worker without waiting.

submit_nowait (function): Submit task to the task pool with no wait.

Asynchronous Method:

start (function): Initialize workers and open the task pool to accept tasks.

close (function): Close all workers and paused the task pool.

scale (function): Scale the number of the task pool’s worker.

submit (function): Submit task to the task pool.

Example:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioAutoScaleTaskPoolPriority() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async close(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[source]

Close all workers and paused the task pool.

Parameters
  • close_worker_timeout (Union[int, float, None], optional) – Timeout for closing all workers. Defaults to None.

  • close_pool_timeout (int, optional) – Timeout for join left tasks. Defaults to 3.

  • safe (bool, optional) – when getting exceptions, raise it or warning it. Defaults to True.

Raises
  • te – close workers timeout.

  • e – unknown error when closing workers.

  • te – waiting for left tasks done timeout

  • e – unknown error when waiting for left tasks done

Async autoscale task pool with default queue

Asynchronous Task Pool Class.

class aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_simple.AioAutoScaleTaskPoolSimple(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]

Bases: aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin.SimpleQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Auto Scale Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop):Event loop running on.

size (int): The worker pool’s size.

closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused

paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.

waiting_tasks_number (int): The number of the waiting tasks.

max_tasks_number (int): The maximum number of the waiting tasks.

Method:

pause (function): Pause the task pool.

scale_nowait (function): Scale the number of the task pool’s worker without waiting.

submit_nowait (function): Submit task to the task pool with no wait.

Asynchronous Method:

start (function): Initialize workers and open the task pool to accept tasks.

close (function): Close all workers and paused the task pool.

scale (function): Scale the number of the task pool’s worker.

submit (function): Submit task to the task pool.

Example:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioAutoScaleTaskPoolSimple() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async close(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[source]

Close all workers and paused the task pool.

Parameters
  • close_worker_timeout (Union[int, float, None], optional) – Timeout for closing all workers. Defaults to None.

  • close_pool_timeout (int, optional) – Timeout for join left tasks. Defaults to 3.

  • safe (bool, optional) – when getting exceptions, raise it or warning it. Defaults to True.

Raises
  • te – close workers timeout.

  • e – unknown error when closing workers.

  • te – waiting for left tasks done timeout

  • e – unknown error when waiting for left tasks done

Async fixed task pool with lifo queue

Asynchronous Task Pool Class.

class aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_lifo.AioFixedTaskPoolLifo(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]

Bases: aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin.LifoQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Asynchronous Task Pool Class with lifo queue.

this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop):Event loop running on.

size (int): The worker pool’s size.

closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused

paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.

waiting_tasks_number (int): The number of the waiting tasks.

max_tasks_number (int): The maximum number of the waiting tasks.

Method:

pause (function): Pause the task pool.

scale_nowait (function): Scale the number of the task pool’s worker without waiting.

submit_nowait (function): Submit task to the task pool with no wait.

Asynchronous Method:

start (function): Initialize workers and open the task pool to accept tasks.

close (function): Close all workers and paused the task pool.

scale (function): Scale the number of the task pool’s worker.

submit (function): Submit task to the task pool.

Example:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioFixedTaskPoolLifo() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")

Async fixed task pool with priority queue

Asynchronous Task Pool Class.

class aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_priority.AioFixedTaskPoolPriority(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]

Bases: aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.priorityq_mixin.PriorityQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop):Event loop running on.

size (int): The worker pool’s size.

closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused

paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.

waiting_tasks_number (int): The number of the waiting tasks.

max_tasks_number (int): The maximum number of the waiting tasks.

Method:

pause (function): Pause the task pool.

scale_nowait (function): Scale the number of the task pool’s worker without waiting.

submit_nowait (function): Submit task to the task pool with no wait.

Asynchronous Method:

start (function): Initialize workers and open the task pool to accept tasks.

close (function): Close all workers and paused the task pool.

scale (function): Scale the number of the task pool’s worker.

submit (function): Submit task to the task pool.

Example:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioFixedTaskPoolPriority() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"],weight=3)
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")
async submit(task_func: Callable[[Any], Any], *, func_args: List[Any] = [], func_kwargs: Dict[str, Any] = {}, weight: int = 4, blocking: bool = True) → Union[_asyncio.Future, Any][source]

Submit task to the task pool.

Parameters
  • task_func (Callable[[Any], Any]) – The task function which will be called by the workers.

  • func_args (List[Any], optional) – The positional parameters for the task function. Defaults to [].

  • func_kwargs (Dict[str, Any], optional) – The keyword parameters for the task function. Defaults to {}.

  • weight (int) – Task’s weight. Defaults to 4.

  • blocking (bool, optional) – set if waiting for the task’s result. Defaults to True.

Raises

NotAvailable – The task pool is paused

Returns

if blocking is True, submit will return the result of the task; else it will return a future which you can await it to get the result.

Return type

Union[asyncio.Future, Any]

submit_nowait(task_func: Callable[[Any], Any], *, func_args: List[Any] = [], func_kwargs: Dict[str, Any] = {}, weight: int = 4) → _asyncio.Future[source]

Submit task to the task pool with no wait.

Parameters
  • task_func (Callable[[Any], Any]) – The task function which will be called by the workers.

  • func_args (List[Any], optional) – The positional parameters for the task function. Defaults to [].

  • func_kwargs (Dict[str, Any], optional) – The keyword parameters for the task function. Defaults to {}.

  • weight (int) – Task’s weight. Defaults to 4.

Raises
  • NotAvailable – The task pool is paused or

  • e – other exception

  • NotAvailable – task pool is full, can not put task any more

Returns

a future which you can await it to get the result.

Return type

asyncio.Future

Async fixed task pool with default queue

Asynchronous Task Pool Class.

class aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_simple.AioFixedTaskPoolSimple(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]

Bases: aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin, aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin, aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin.SimpleQMixin, aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase

Simple Asynchronous Task Pool Class.

this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.

Property:

loop (asyncio.events.AbstractEventLoop):Event loop running on.

size (int): The worker pool’s size.

closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused

paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.

waiting_tasks_number (int): The number of the waiting tasks.

max_tasks_number (int): The maximum number of the waiting tasks.

Method:

pause (function): Pause the task pool.

scale_nowait (function): Scale the number of the task pool’s worker without waiting.

submit_nowait (function): Submit task to the task pool with no wait.

Asynchronous Method:

start (function): Initialize workers and open the task pool to accept tasks.

close (function): Close all workers and paused the task pool.

scale (function): Scale the number of the task pool’s worker.

submit (function): Submit task to the task pool.

Example:

>>> import asyncio
>>> async def test(name):
...     print(f"{name} start")
...     for i in range(5):
...         await asyncio.sleep(1)
...     result = f"{name} done"
...     print(result)
...     return "ok:"+ result
>>> async def main():
...     async with AioFixedTaskPoolSimple() as task_pool:
...         print(f"test pool size {task_pool.size}")
...         print("test 4 task with pool size 3")
...         print("test await blocking submit")
...         r = await task_pool.submit(test, func_args=["e"])
...         assert r == "ok:e done"
...         print("test await blocking submit")
...         print("scale 3")
...         await task_pool.scale(3)
...         print(f"test pool size {task_pool.size}")
...
...         print("scale -3")
...         await task_pool.scale(-3)
...         print(f"test pool size {task_pool.size}")
...         await asyncio.sleep(2)
...         assert task_pool.size==6
...         print(f"after 2 s test pool size {task_pool.size}")