aio_parallel_tools.aio_actor package

Actor API

异步Actor抽象基类

class aio_parallel_tools.aio_actor.actor_abc.ActorABC[源代码]

基类:object

异步Actor抽象基类

abstract classmethod AvailableScope() → List[Any][源代码]

获取到可用的actor实例

返回

可用的actor实例列表

返回类型

List[Any]

abstract classmethod BestToSendScope(num: int = None) → List[Any][源代码]

获取一定数量的最适合被发送消息的actor实例

返回

一定数量的最适合被发送消息的actor实例列表

返回类型

List[Any]

abstract classmethod Clean()[源代码]

清理不可用的actor实例

abstract classmethod Close(num: int)[源代码]

关闭一定数量的可用actor

参数

num (int) -- 关闭一定数量的可用actor

abstract classmethod FindById(aid: str) → Any[源代码]

通过id号查找到actor实例

参数

aid (str) -- str类型的actorid号

返回

一个actor实例

返回类型

Any

abstract classmethod NotAvailableScope() → List[Any][源代码]

获取不可用的actor实例

返回

不可用的actor实例列表

返回类型

List[Any]

abstract classmethod NotPausedScope() → List[Any][源代码]

获取未被暂停的actor实例

返回

未被暂停的actor实例列表

返回类型

List[Any]

abstract classmethod NotRunningScope() → List[Any][源代码]

获取不再执行状态的actor实例

返回

不再执行状态的actor实例列表

返回类型

List[Any]

abstract classmethod PausedScope() → List[Any][源代码]

获取被暂停的actor实例

返回

被暂停的actor实例列表

返回类型

List[Any]

abstract classmethod Publish(message: Any, timeout: int)[源代码]

发送消息给所有可用的actor实例

参数
  • message (Any) -- 发送给actor实例的消息

  • timeout (int) -- 发送操作的过期时间

abstract classmethod Restart(num: int)[源代码]

重启一定数量的不可用actor实例

参数

num (int) -- 要被重启的actor实例数量

abstract classmethod RunningScope() → List[Any][源代码]

获取执行状态的actor实例

返回

不再执行状态的actor实例列表

返回类型

List[Any]

abstract classmethod Send(message: Any, timeout: int)[源代码]

发送消息给最适合接收的actor实例

参数
  • message (Any) -- 发送给actor实例的消息

  • timeout (int) -- 发送操作的过期时间

abstract classmethod SendById(aid: str, message: Any, timeout: int)[源代码]

发送消息给指定id的actor实例

参数
  • aid (str) -- str类型的actorid号

  • message (Any) -- 发送给actor实例的消息

  • timeout (int) -- 发送操作的过期时间

abstract classmethod SendRandom(message: Any, timeout)[源代码]

发送消息给随机一个可用的actor实例

参数
  • message (Any) -- 发送给actor实例的消息

  • timeout (int) -- 发送操作的过期时间

abstract classmethod Start(num: int, inbox_maxsize: int = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, rev_timeout: Optional[int] = None)[源代码]

创建并启动一定数量的actor实例

参数
  • num (int) -- 要创建并启动的actor实例数量

  • inbox_maxsize (int, optional) -- actor实例消息邮箱的大小,默认为0意味无限大.

  • loop (Optional[asyncio.events.AbstractEventLoop], optional) -- actor实例执行所在的事件循环

  • rev_timeout ([int], optional) -- recive方法的执行超时时长.默认为None,意味不会超时.

abstract property aid

获取actor实例的id

返回

actor实例的id

返回类型

bool

abstract property available

检查actor实例是否可用.

返回

actor实例是否可用

返回类型

bool

abstract async close(timeout: Optional[int] = None)[源代码]

关闭actor实例

参数

timeout (Optional[int], optional) -- 关闭操作的超时时间.

abstract property inbox_maxsize

actor对象内部的消息邮箱的最大容量

返回

actor对象内部的消息邮箱的最大容量

返回类型

int

abstract property inbox_size

actor对象内部的消息邮箱的当前容量

返回

actor对象内部的消息邮箱的当前容量

返回类型

int

abstract property loop

任务执行所在的事件循环

返回

任务执行所在的事件循环

返回类型

asyncio.events.AbstractEventLoop

abstract property paused

检查actor实例是否被暂停

返回

actor实例是否被暂停

返回类型

bool

abstract async receive(message)[源代码]

Actor的执行逻辑.需要被子类继承实现.

参数

message (Any) -- 发送给actor实例的消息

abstract property running

检查actor实例是否在执行状态

返回

actor实例是否在执行状态

返回类型

bool

abstract async send(message, timeout=None)[源代码]

向实例发送消息

参数
  • message (Any) -- 发送给actor实例的消息

  • timeout (int) -- 发送操作的过期时间

abstract send_nowait(message)[源代码]

使用同步接口向实例发送消息

参数

message (Any) -- 发送给actor实例的消息

abstract start()[源代码]

启动actor实例

abstract property task

actor实例内部执行的任务

返回

actor实例内部执行的任务

返回类型

asyncio.Task

Base Class

Async Actor Tool.

class aio_parallel_tools.aio_actor.actor_basic.AioActor(inbox_maxsize: int = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, rev_timeout: Optional[int] = None)[源代码]

基类:aio_parallel_tools.aio_actor.mixins.manage_mixin.ManageMixin, aio_parallel_tools.aio_actor.mixins.inbox_mixin.InboxMixin, aio_parallel_tools.aio_actor.mixins.task_mixin.TaskMixin, aio_parallel_tools.aio_actor.mixins.hooks_mixin.HooksMixin, aio_parallel_tools.aio_actor.mixins.id_mixin.IdentifyMixin, aio_parallel_tools.aio_actor.mixins.loop_mixin.LoopMixin, aio_parallel_tools.aio_actor.actor_abc.ActorABC

Base Async Actor class.

要使用这个基类,我们需要创建一个子类并实现其中的异步方法`receive`

Usage:

>>> class Pinger(AioActor):
...     async def receive(self, message):
...         print(message)
...         try:
...             await ActorManager.get_actor("Ponger").Send('ping')
...         except Exception as e:
...             print(f"receive run error {e}")
...         finally:
...             await asyncio.sleep(0.5)
>>> class Ponger(AioActor):
...     async def receive(self, message):
...     print(message)
...     try:
...         await ActorManager.get_actor("Pinger").Send('pong')
...     except Exception as e:
...         print(f"receive run error {e}")
...     finally:
...         await asyncio.sleep(0.5)
>>> async def main():
        Pinger.Start(num=3)
        Ponger.Start(num=3)
        await asyncio.sleep(1)
        for i in Pinger.Members:
            print("****************")
            print(i.aid)
            print(i.available)
            print(i.running)
            print(i.paused)
            print("****************")
        await Pinger.Send("start")
        await asyncio.sleep(10)
        await Pinger.Close(num=3)
        await Ponger.Close(num=3)
Members = {}
property available

检查actor实例是否可用.

返回

actor实例是否可用

返回类型

bool

async close(timeout: Optional[int] = None)[源代码]

关闭actor实例

参数

timeout (Optional[int], optional) -- 关闭操作的超时时间.

close_nowait()[源代码]
start()[源代码]

启动actor实例

Actor Manager

Functions for managing Actors.

class aio_parallel_tools.aio_actor.actor_manager.ActorManagerRegister[源代码]

基类:type

用于注册Actor类的元类

aio_parallel_tools.aio_actor.actor_manager.get_actor(actor_name: str) → Optional[aio_parallel_tools.aio_actor.actor_abc.ActorABC][源代码]

通过类名获取Actor的子类

参数

actor_name (str) -- Actor的子类类名

返回

actor class

返回类型

Optional[ActorABC]

aio_parallel_tools.aio_actor.actor_manager.has_actor() → List[str][源代码]

获取所有的actor子类名

返回

所有的actor子类名

返回类型

List[str]