aio_parallel_tools.aio_actor.mixins.task_mixin 源代码
import warnings
import asyncio
from aio_parallel_tools.aio_actor.exception_and_warning import ActorTimeoutWarning
from aio_parallel_tools.aio_actor.signal import ActorExit
[文档]class TaskMixin:
def __init__(self, rev_timeout: int):
self._rev_timeout = rev_timeout
self._task = None
self._running = False
self._actor_fut = None
@property
def running(self):
return self._running
@property
def task(self):
return self._task
async def _run(self):
await self.before_every_loop()
self._running = True
while self.running:
await self.before_every_loop()
try:
if self._rev_timeout is None:
message = await self.inbox.get()
else:
message = await asyncio.wait_for(self.inbox.get(), timeout=self._rev_timeout)
except asyncio.TimeoutError:
await self.handle_rev_timeout()
else:
try:
message = await self.before_deal_rev(message)
if message is ActorExit:
warnings.warn("actor closed")
self._running = False
self.befor_actor_colse()
# asyncio.current_task().cancel()
break
elif isinstance(message, Exception):
self._running = False
self.befor_actor_colse()
raise message
else:
try:
result = await self.receive(message)
except Exception as e:
warnings.warn(f"actor {self.__class__.__name__} receive {message} error: {e}")
try:
await self.after_deal_rev(message, e)
except Exception as e:
warnings.warn(f"actor {self.__class__.__name__} after deal rev error: {e}")
else:
try:
await self.after_deal_rev(message, result)
except Exception as e:
warnings.warn(f"actor {self.__class__.__name__} after deal rev error: {e}")
finally:
self.inbox.task_done()
finally:
await self.after_every_loop()
[文档] async def handle_rev_timeout(self):
warnings.warn(f"actor {self.__class__.__name__} rev message timeout", ActorTimeoutWarning)
[文档] def task_done_callback(self, fut):
try:
result = fut.result()
except Exception as e:
self._actor_fut.set_result(False)
else:
self._actor_fut.set_result(True)
[文档] def start_task(self):
if not self.running:
task = asyncio.create_task(self._run())
self._actor_fut = self.loop.create_future()
task.add_done_callback(self.after_actor_close)
task.add_done_callback(self.task_done_callback)
self._task = task
[文档] async def close_task(self):
await self._actor_fut