aio_parallel_tools.aio_task_pool.aio_autoscale_task_pool_lifo 源代码

"""Asynchronous Task Pool Class."""
import asyncio
import concurrent
from typing import Optional, Union
from aio_parallel_tools.aio_task_pool.core.task_pool_base import AioTaskPoolBase
from aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.lifoq_mixin import LifoQMixin
from aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.autoscale_worker_manager_mixin import AutoScaleWorkerManagerMixin
from aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin import SimpleProducerMixin


[文档]class AioAutoScaleTaskPoolLifo(SimpleProducerMixin, AutoScaleWorkerManagerMixin, LifoQMixin, AioTaskPoolBase): """Auto Scale Asynchronous Task Pool Class. this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of `Producer Consumer model`. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the `scale` interface. And you, as the Producer, can send your task with the `submit` interface. If you want to close submit interface, you can use `pause` interface. Property: loop (asyncio.events.AbstractEventLoop):Event loop running on. size (int): The worker pool's size. closed (bool): Check if the worker pool's size is 0 and the worker pool is paused paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it's True. waiting_tasks_number (int): The number of the waiting tasks. max_tasks_number (int): The maximum number of the waiting tasks. Method: pause (function): Pause the task pool. scale_nowait (function): Scale the number of the task pool's worker without waiting. submit_nowait (function): Submit task to the task pool with no wait. Asynchronous Method: start (function): Initialize workers and open the task pool to accept tasks. close (function): Close all workers and paused the task pool. scale (function): Scale the number of the task pool's worker. submit (function): Submit task to the task pool. Example: >>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioAutoScaleTaskPoolLifo() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}") """ def __init__(self, *, loop: Optional[asyncio.events.AbstractEventLoop] = None, min_size: int = 3, max_size: Optional[int] = None, auto_scale_interval: int = 10, auto_scale_cache_len: int = 20, executor: concurrent.futures.Executor = None, queue: Optional[asyncio.Queue] = None, queue_maxsize: int = 0) -> None: """Initialize task pool. Args: loop (Optional[asyncio.events.AbstractEventLoop], optional): Event loop running on.. Defaults to None. min_size (int, optional): Min size of task pool. Defaults to 3. max_size (int, optional): Max size of task pool. Defaults to min_size+5. auto_scale_interval (int, optional): How often auto scale task run. auto_scale_cache_len (int, optional): Cache length. queue (Optional[asyncio.Queue], optional): Using a exist queue. Defaults to None. queue_maxsize (int, optional): Set the maxsize of a new queue. Defaults to 0. executor (concurrent.futures.Executor, optional): Executor to run synchronous functions. Defaults to None. """ AioTaskPoolBase.__init__(self, loop=loop) LifoQMixin.__init__(self, queue=queue, queue_maxsize=queue_maxsize) AutoScaleWorkerManagerMixin.__init__(self, min_size=min_size, max_size=max_size, auto_scale_interval=auto_scale_interval, auto_scale_cache_len=auto_scale_cache_len, executor=executor) SimpleProducerMixin.__init__(self)
[文档] async def close(self, close_worker_timeout: Union[int, float, None] = None, close_pool_timeout: int = 3, safe: bool = True): """Close all workers and paused the task pool. Args: close_worker_timeout (Union[int, float, None], optional): Timeout for closing all workers. Defaults to None. close_pool_timeout (int, optional): Timeout for join left tasks. Defaults to 3. safe (bool, optional): when getting exceptions, raise it or warning it. Defaults to True. Raises: te: close workers timeout. e: unknown error when closing workers. te: waiting for left tasks done timeout e: unknown error when waiting for left tasks done """ self.close_auto_scale_worker() await self.close_pool(close_worker_timeout=close_worker_timeout, close_pool_timeout=close_pool_timeout, safe=safe)