Source code for asyncio_executor
import asyncio
import functools
import inspect
import threading
from concurrent import futures
from typing import (Union, Callable, Any, Optional)
def _loop_mgr(loop: asyncio.AbstractEventLoop)->None:
"""起一个线程执行事件循环的`run_forever`方法.
[Start up a thread for running the eventloop's `run_forever` method.]
当它被终止时会清理未完结的协程,但不会关闭事件循环
[When it shutdown, all the coroutines will be closed.but the eventloop will not close.]
Params:
loop (asyncio.AbstractEventLoop) : - 事件循环[the eventloop]
"""
if loop.is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
[docs]async def func_executor_coroutine(
func: Any,
loop: Optional[asyncio.AbstractEventLoop]=None)->Any:
"""将函数使用`loop.run_in_executor`包装成协程函数.
[wrap `loop.run_in_executor` as a coroutine function.]
Params:
func (callable) : - 需要使用执行器执行的函数[the function who need the executor to run].
loop (asyncio.AbstractEventLoop) : - 事件循环[the eventloop]
Return:
(Any): - 执行器的执行结果[the result the func retruned ran in the executor]
"""
_loop = loop or asyncio.get_event_loop()
return await _loop.run_in_executor(None, func)
[docs]class AsyncioExecutor(futures.Executor):
"""asyncio执行器,可以执行函数或者协程.
[Asyncio executor who can execute the function or coroutine.]
Attributes:
_shutdown (bool): - 执行器是否终止
[the executor shutdowned or not]
_loop (asyncio.AbstractEventLoop): - 事件循环
[the eventloop]
_thread (threading.Thread): - 执行事件循环上任务的线程
[the thread who runs the tasks in the eventloop]
_func_executor (futures.Executor): - 如果使用执行器执行函数,那么默认使用什么执行器
[which executor will be used by default if must run a function]
"""
def __init__(self, *,
loop: Optional[asyncio.AbstractEventLoop]=None,
func_executor: Optional[futures.Executor]=None)->None:
super().__init__()
self._shutdown = False
self._loop = loop or asyncio.get_event_loop()
self._func_executor = func_executor or futures.ThreadPoolExecutor()
self._loop.set_default_executor(func_executor)
self._thread = threading.Thread(target=_loop_mgr,
args=(self._loop,),
daemon=True)
self._thread.start()
[docs] def submit(self,
fn: Any,
*args: Any, **kwargs: Any)->futures.Future:
"""提交任务.
[submit a task.]
会先检查执行器是否已经关闭或者执行器的事件循环是否还在运行. 如果不是则会抛出一个运行时异常
[It will check if the excutor has already closed or the eventloop is not running.
If yes, will throw a RuntimeError exception.]
Params:
``fn (Union[callable,coroutinefunction])``: - 要执行的函数或者协程函数
[the function or coroutinefunction who need to execute]
``*args/**kwargs`` : - fn的参数
[the function's params]
Return:
(concurrent.futures.Future) : - 丢进loop后的future对象,因为使用的是`run_coroutine_threadsafe`方法,因此返回的是一个线程安全的`concurrent.futures.Future`对象.
[the instance of Future, because of using the method `run_coroutine_threadsafe`,it will return a instance of `concurrent.futures.Future` who is thread safe.]
Raise:
(RuntimeError) : - 当执行器是已经关闭或者执行器的事件循环不在运行时,会抛出运行时异常表明无法执行该操作.
[when the excutor has already closed or the eventloop is not running.]
"""
if self._shutdown:
raise RuntimeError(
'Cannot schedule new futures after shutdown')
if not self._loop.is_running():
raise RuntimeError(
"Loop must be started before any function can "
"be submitted")
if inspect.iscoroutinefunction(fn):
# 如果是协程对象,那么就使用run_coroutine_threadsafe将协程放入事件循环
# `asyncio.run_coroutine_threadsafe`返回一个`concurrent.futures.Future`对象
# 因此需要将其包装一下成为`asyncio.Future`对象
coro = fn(*args, **kwargs)
fu = asyncio.run_coroutine_threadsafe(coro, self._loop)
return fu
else:
# 如果是其他可执行对象,那么就使用run_in_executor将可执行对象委托给执行器放入事件循环
func = functools.partial(fn, *args, **kwargs)
coro = func_executor_coroutine(func)
fu = asyncio.run_coroutine_threadsafe(coro, self._loop)
return fu
[docs] def shutdown(self, wait: bool=True, timeout: int=None)->None:
"""关闭执行器
[Close the executor.]
Params:
wait (bool): - 是否等待线程同步
[if waitting for syncing the thread or not.]
timeout (int): - wait为True时才有效果,设置join的等待时间
[set the waitting time.it will take effect only when param `wait` is `True`.]]
"""
self._loop.call_soon_threadsafe(self._loop.stop)
self._shutdown = True
if wait:
self._thread.join(timeout)