aio_parallel_tools.aio_task_pool package¶
Async autoscale task pool with lifo queue¶
Asynchronous Task Pool Class.
-
class
aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_lifo.
AioAutoScaleTaskPoolLifo
(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]¶ Bases:
aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin.LifoQMixin
,aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase
Auto Scale Asynchronous Task Pool Class.
this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.
Property:
loop (asyncio.events.AbstractEventLoop):Event loop running on.
size (int): The worker pool’s size.
closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused
paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.
waiting_tasks_number (int): The number of the waiting tasks.
max_tasks_number (int): The maximum number of the waiting tasks.
Method:
pause (function): Pause the task pool.
scale_nowait (function): Scale the number of the task pool’s worker without waiting.
submit_nowait (function): Submit task to the task pool with no wait.
Asynchronous Method:
start (function): Initialize workers and open the task pool to accept tasks.
close (function): Close all workers and paused the task pool.
scale (function): Scale the number of the task pool’s worker.
submit (function): Submit task to the task pool.
Example:
>>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioAutoScaleTaskPoolLifo() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}")
-
async
close
(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[source]¶ Close all workers and paused the task pool.
- Parameters
close_worker_timeout (Union[int, float, None], optional) – Timeout for closing all workers. Defaults to None.
close_pool_timeout (int, optional) – Timeout for join left tasks. Defaults to 3.
safe (bool, optional) – when getting exceptions, raise it or warning it. Defaults to True.
- Raises
te – close workers timeout.
e – unknown error when closing workers.
te – waiting for left tasks done timeout
e – unknown error when waiting for left tasks done
-
async
Async autoscale task pool with priority queue¶
Asynchronous Task Pool Class.
-
class
aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_priority.
AioAutoScaleTaskPoolPriority
(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]¶ Bases:
aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.priorityq_mixin.PriorityQMixin
,aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase
Auto Scale Asynchronous Task Pool Class.
this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.
Property:
loop (asyncio.events.AbstractEventLoop):Event loop running on.
size (int): The worker pool’s size.
closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused
paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.
waiting_tasks_number (int): The number of the waiting tasks.
max_tasks_number (int): The maximum number of the waiting tasks.
Method:
pause (function): Pause the task pool.
scale_nowait (function): Scale the number of the task pool’s worker without waiting.
submit_nowait (function): Submit task to the task pool with no wait.
Asynchronous Method:
start (function): Initialize workers and open the task pool to accept tasks.
close (function): Close all workers and paused the task pool.
scale (function): Scale the number of the task pool’s worker.
submit (function): Submit task to the task pool.
Example:
>>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioAutoScaleTaskPoolPriority() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}")
-
async
close
(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[source]¶ Close all workers and paused the task pool.
- Parameters
close_worker_timeout (Union[int, float, None], optional) – Timeout for closing all workers. Defaults to None.
close_pool_timeout (int, optional) – Timeout for join left tasks. Defaults to 3.
safe (bool, optional) – when getting exceptions, raise it or warning it. Defaults to True.
- Raises
te – close workers timeout.
e – unknown error when closing workers.
te – waiting for left tasks done timeout
e – unknown error when waiting for left tasks done
-
async
Async autoscale task pool with default queue¶
Asynchronous Task Pool Class.
-
class
aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_simple.
AioAutoScaleTaskPoolSimple
(*, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]¶ Bases:
aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin.AutoScaleWorkerManagerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin.SimpleQMixin
,aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase
Auto Scale Asynchronous Task Pool Class.
this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.
Property:
loop (asyncio.events.AbstractEventLoop):Event loop running on.
size (int): The worker pool’s size.
closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused
paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.
waiting_tasks_number (int): The number of the waiting tasks.
max_tasks_number (int): The maximum number of the waiting tasks.
Method:
pause (function): Pause the task pool.
scale_nowait (function): Scale the number of the task pool’s worker without waiting.
submit_nowait (function): Submit task to the task pool with no wait.
Asynchronous Method:
start (function): Initialize workers and open the task pool to accept tasks.
close (function): Close all workers and paused the task pool.
scale (function): Scale the number of the task pool’s worker.
submit (function): Submit task to the task pool.
Example:
>>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioAutoScaleTaskPoolSimple() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}")
-
async
close
(close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True)[source]¶ Close all workers and paused the task pool.
- Parameters
close_worker_timeout (Union[int, float, None], optional) – Timeout for closing all workers. Defaults to None.
close_pool_timeout (int, optional) – Timeout for join left tasks. Defaults to 3.
safe (bool, optional) – when getting exceptions, raise it or warning it. Defaults to True.
- Raises
te – close workers timeout.
e – unknown error when closing workers.
te – waiting for left tasks done timeout
e – unknown error when waiting for left tasks done
-
async
Async fixed task pool with lifo queue¶
Asynchronous Task Pool Class.
-
class
aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_lifo.
AioFixedTaskPoolLifo
(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]¶ Bases:
aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin.LifoQMixin
,aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase
Asynchronous Task Pool Class with lifo queue.
this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.
Property:
loop (asyncio.events.AbstractEventLoop):Event loop running on.
size (int): The worker pool’s size.
closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused
paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.
waiting_tasks_number (int): The number of the waiting tasks.
max_tasks_number (int): The maximum number of the waiting tasks.
Method:
pause (function): Pause the task pool.
scale_nowait (function): Scale the number of the task pool’s worker without waiting.
submit_nowait (function): Submit task to the task pool with no wait.
Asynchronous Method:
start (function): Initialize workers and open the task pool to accept tasks.
close (function): Close all workers and paused the task pool.
scale (function): Scale the number of the task pool’s worker.
submit (function): Submit task to the task pool.
Example:
>>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioFixedTaskPoolLifo() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}")
Async fixed task pool with priority queue¶
Asynchronous Task Pool Class.
-
class
aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_priority.
AioFixedTaskPoolPriority
(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]¶ Bases:
aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.priorityq_mixin.PriorityQMixin
,aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase
Asynchronous Task Pool Class.
this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.
Property:
loop (asyncio.events.AbstractEventLoop):Event loop running on.
size (int): The worker pool’s size.
closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused
paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.
waiting_tasks_number (int): The number of the waiting tasks.
max_tasks_number (int): The maximum number of the waiting tasks.
Method:
pause (function): Pause the task pool.
scale_nowait (function): Scale the number of the task pool’s worker without waiting.
submit_nowait (function): Submit task to the task pool with no wait.
Asynchronous Method:
start (function): Initialize workers and open the task pool to accept tasks.
close (function): Close all workers and paused the task pool.
scale (function): Scale the number of the task pool’s worker.
submit (function): Submit task to the task pool.
Example:
>>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioFixedTaskPoolPriority() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"],weight=3) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}")
-
async
submit
(task_func: Callable[[Any], Any], *, func_args: List[Any] = [], func_kwargs: Dict[str, Any] = {}, weight: int = 4, blocking: bool = True) → Union[_asyncio.Future, Any][source]¶ Submit task to the task pool.
- Parameters
task_func (Callable[[Any], Any]) – The task function which will be called by the workers.
func_args (List[Any], optional) – The positional parameters for the task function. Defaults to [].
func_kwargs (Dict[str, Any], optional) – The keyword parameters for the task function. Defaults to {}.
weight (int) – Task’s weight. Defaults to 4.
blocking (bool, optional) – set if waiting for the task’s result. Defaults to True.
- Raises
NotAvailable – The task pool is paused
- Returns
if blocking is True, submit will return the result of the task; else it will return a future which you can await it to get the result.
- Return type
Union[asyncio.Future, Any]
-
submit_nowait
(task_func: Callable[[Any], Any], *, func_args: List[Any] = [], func_kwargs: Dict[str, Any] = {}, weight: int = 4) → _asyncio.Future[source]¶ Submit task to the task pool with no wait.
- Parameters
task_func (Callable[[Any], Any]) – The task function which will be called by the workers.
func_args (List[Any], optional) – The positional parameters for the task function. Defaults to [].
func_kwargs (Dict[str, Any], optional) – The keyword parameters for the task function. Defaults to {}.
weight (int) – Task’s weight. Defaults to 4.
- Raises
NotAvailable – The task pool is paused or
e – other exception
NotAvailable – task pool is full, can not put task any more
- Returns
a future which you can await it to get the result.
- Return type
asyncio.Future
-
async
Async fixed task pool with default queue¶
Asynchronous Task Pool Class.
-
class
aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_simple.
AioFixedTaskPoolSimple
(*, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures._base.Executor = None, queue: Optional[asyncio.queues.Queue] = None, queue_maxsize: int = 0)[source]¶ Bases:
aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin.SimpleProducerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin.FixedWorkerManagerMixin
,aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin.SimpleQMixin
,aio_parallel_tools.aio_task_pool.core.task_pool_base.AioTaskPoolBase
Simple Asynchronous Task Pool Class.
this pool is used when you need to limit the max number of parallel tasks at one time. It’s a derivative of Producer Consumer model. The pool instance will manage a number of consumer as worker. You can scale the worker’s number as you wish with the scale interface. And you, as the Producer, can send your task with the submit interface. If you want to close submit interface, you can use pause interface.
Property:
loop (asyncio.events.AbstractEventLoop):Event loop running on.
size (int): The worker pool’s size.
closed (bool): Check if the worker pool’s size is 0 and the worker pool is paused
paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it’s True.
waiting_tasks_number (int): The number of the waiting tasks.
max_tasks_number (int): The maximum number of the waiting tasks.
Method:
pause (function): Pause the task pool.
scale_nowait (function): Scale the number of the task pool’s worker without waiting.
submit_nowait (function): Submit task to the task pool with no wait.
Asynchronous Method:
start (function): Initialize workers and open the task pool to accept tasks.
close (function): Close all workers and paused the task pool.
scale (function): Scale the number of the task pool’s worker.
submit (function): Submit task to the task pool.
Example:
>>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioFixedTaskPoolSimple() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}")