aio_parallel_tools.aio_task_pool.aio_fixed_task_pool_simple 源代码

"""Asynchronous Task Pool Class."""
import asyncio
import concurrent
from typing import Optional
from aio_parallel_tools.aio_task_pool.core.task_pool_base import AioTaskPoolBase
from aio_parallel_tools.aio_task_pool.core.mixins.queue_mixin.simpleq_mixin import SimpleQMixin
from aio_parallel_tools.aio_task_pool.core.mixins.worker_manager_mixin.fix_worker_manager_mixin import FixedWorkerManagerMixin
from aio_parallel_tools.aio_task_pool.core.mixins.producer_mixin.simple_producer_mixin import SimpleProducerMixin


[文档]class AioFixedTaskPoolSimple(SimpleProducerMixin, FixedWorkerManagerMixin, SimpleQMixin, AioTaskPoolBase): """Simple Asynchronous Task Pool Class. this pool is used when you need to limit the max number of parallel tasks at one time. It's a derivative of `Producer Consumer model`. The pool instance will manage a number of consumer as worker. You can scale the worker's number as you wish with the `scale` interface. And you, as the Producer, can send your task with the `submit` interface. If you want to close submit interface, you can use `pause` interface. Property: loop (asyncio.events.AbstractEventLoop):Event loop running on. size (int): The worker pool's size. closed (bool): Check if the worker pool's size is 0 and the worker pool is paused paused (bool): Check if the worker pool is paused. If can accept new tasks,the result is False; else it's True. waiting_tasks_number (int): The number of the waiting tasks. max_tasks_number (int): The maximum number of the waiting tasks. Method: pause (function): Pause the task pool. scale_nowait (function): Scale the number of the task pool's worker without waiting. submit_nowait (function): Submit task to the task pool with no wait. Asynchronous Method: start (function): Initialize workers and open the task pool to accept tasks. close (function): Close all workers and paused the task pool. scale (function): Scale the number of the task pool's worker. submit (function): Submit task to the task pool. Example: >>> import asyncio >>> async def test(name): ... print(f"{name} start") ... for i in range(5): ... await asyncio.sleep(1) ... result = f"{name} done" ... print(result) ... return "ok:"+ result >>> async def main(): ... async with AioFixedTaskPoolSimple() as task_pool: ... print(f"test pool size {task_pool.size}") ... print("test 4 task with pool size 3") ... print("test await blocking submit") ... r = await task_pool.submit(test, func_args=["e"]) ... assert r == "ok:e done" ... print("test await blocking submit") ... print("scale 3") ... await task_pool.scale(3) ... print(f"test pool size {task_pool.size}") ... ... print("scale -3") ... await task_pool.scale(-3) ... print(f"test pool size {task_pool.size}") ... await asyncio.sleep(2) ... assert task_pool.size==6 ... print(f"after 2 s test pool size {task_pool.size}") """ def __init__(self, *, init_size: int = 3, loop: Optional[asyncio.events.AbstractEventLoop] = None, executor: concurrent.futures.Executor = None, queue: Optional[asyncio.Queue] = None, queue_maxsize: int = 0) -> None: """Initialize task pool. Args: init_size (int, optional): Set the binginning size of task pool. Defaults to 3. loop (Optional[asyncio.events.AbstractEventLoop], optional): Event loop running on.. Defaults to None. queue (Optional[asyncio.Queue], optional): Using a exist queue. Defaults to None. queue_maxsize (int, optional): Set the maxsize of a new queue. Defaults to 0. executor (concurrent.futures.Executor, optional): Executor to run synchronous functions. Defaults to None. """ AioTaskPoolBase.__init__(self, loop=loop) SimpleQMixin.__init__(self, queue=queue, queue_maxsize=queue_maxsize) FixedWorkerManagerMixin.__init__(self, init_size=init_size, executor=executor) SimpleProducerMixin.__init__(self)