aio_parallel_tools.aio_actor.actor_basic 源代码
"""Async Actor Tool."""
import warnings
import uuid
import asyncio
from typing import Optional
from aio_parallel_tools.aio_actor.mixins.hooks_mixin import HooksMixin
from aio_parallel_tools.aio_actor.mixins.id_mixin import IdentifyMixin
from aio_parallel_tools.aio_actor.mixins.inbox_mixin import InboxMixin
from aio_parallel_tools.aio_actor.mixins.task_mixin import TaskMixin
from aio_parallel_tools.aio_actor.mixins.loop_mixin import LoopMixin
from aio_parallel_tools.aio_actor.mixins.manage_mixin import ManageMixin
from aio_parallel_tools.aio_actor.actor_abc import ActorABC
from aio_parallel_tools.aio_actor.actor_manager import ActorManagerRegister
from aio_parallel_tools.aio_actor.exception_and_warning import InboxNearllyFullWarning
from aio_parallel_tools.aio_actor.signal import ActorExit
[文档]class AioActor(ManageMixin, InboxMixin, TaskMixin, HooksMixin, IdentifyMixin, LoopMixin, ActorABC, metaclass=ActorManagerRegister):
"""Base Async Actor class.
To use the base class,we should create a sub class and write a implement of async method `receive`.
Usage:
>>> class Pinger(AioActor):
... async def receive(self, message):
... print(message)
... try:
... await ActorManager.get_actor("Ponger").Send('ping')
... except Exception as e:
... print(f"receive run error {e}")
... finally:
... await asyncio.sleep(0.5)
>>> class Ponger(AioActor):
... async def receive(self, message):
... print(message)
... try:
... await ActorManager.get_actor("Pinger").Send('pong')
... except Exception as e:
... print(f"receive run error {e}")
... finally:
... await asyncio.sleep(0.5)
>>> async def main():
Pinger.Start(num=3)
Ponger.Start(num=3)
await asyncio.sleep(1)
for i in Pinger.Members:
print("****************")
print(i.aid)
print(i.available)
print(i.running)
print(i.paused)
print("****************")
await Pinger.Send("start")
await asyncio.sleep(10)
await Pinger.Close(num=3)
await Ponger.Close(num=3)
"""
def __init__(self, inbox_maxsize: int = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, rev_timeout: Optional[int] = None):
ActorABC.__init__(self)
LoopMixin.__init__(self, loop=loop)
ManageMixin.__init__(self)
IdentifyMixin.__init__(self)
HooksMixin.__init__(self)
TaskMixin.__init__(self, rev_timeout=rev_timeout)
InboxMixin.__init__(self, inbox_maxsize=inbox_maxsize)
@property
def available(self):
if self.task is None:
return False
if self.task.done():
return False
if self.running is False:
return False
if self.inbox.full():
return False
if self.paused:
return False
if self.inbox_maxsize > 3 and self.inbox_maxsize > self.inbox.qsize() >= int(self.inbox_maxsize * 0.8):
warnings.warn(f"inbox {self.aid} nearly full", InboxNearllyFullWarning)
return True
[文档] async def close(self, timeout: Optional[int] = None):
await self.send(ActorExit, timeout=timeout)
self.close_accept()
await self.close_task()
[文档] def close_nowait(self):
self.send_nowait(ActorExit)
[文档] def start(self):
self.before_actor_start()
self.start_accept()
self.start_task()
self.after_actor_start()