aio_parallel_tools.aio_actor package

Actor API

Asynchronous Actor Abstract Base Class.

class aio_parallel_tools.aio_actor.actor_abc.ActorABC[source]

Bases: object

Asynchronous Actor Abstract Base Class.

abstract classmethod AvailableScope() → List[Any][source]

Get the available actor instance.

Returns

List of the available actor instance.

Return type

List[Any]

abstract classmethod BestToSendScope(num: int = None) → List[Any][source]

Get a number of the best to send actor instance.

Returns

A number of the best to send actor instance.

Return type

List[Any]

abstract classmethod Clean()[source]

Clean all not available actors.

abstract classmethod Close(num: int)[source]

Close a number of available actor.

Parameters

num (int) – Close a number of available actor.

abstract classmethod FindById(aid: str) → Any[source]

Find a actor instance by id.

Parameters

aid (str) – actor id in str

Returns

a actor instance.

Return type

Any

abstract classmethod NotAvailableScope() → List[Any][source]

Get the not available actor instance.

Returns

List of the not available actor instance.

Return type

List[Any]

abstract classmethod NotPausedScope() → List[Any][source]

Get the not paused actor instance.

Returns

List of the not paused actor instance.

Return type

List[Any]

abstract classmethod NotRunningScope() → List[Any][source]

Get the not running actor instance.

Returns

List of the running actor instance.

Return type

List[Any]

abstract classmethod PausedScope() → List[Any][source]

Get the paused actor instance.

Returns

List of the paused actor instance.

Return type

List[Any]

abstract classmethod Publish(message: Any, timeout: int)[source]

Send message to all available actor.

Parameters
  • message (Any) – Message to send to the actor.

  • timeout (int) – Timeout of the sending action.

abstract classmethod Restart(num: int)[source]

Restart a number of not available actor.

Parameters

num (int) – The number of actor to restart.

abstract classmethod RunningScope() → List[Any][source]

Get the running actor instance.

Returns

List of the running actor instance.

Return type

List[Any]

abstract classmethod Send(message: Any, timeout: int)[source]

Send message to the most available actor.

Parameters
  • message (Any) – Message to send to the actor.

  • timeout (int) – Timeout of the sending action.

abstract classmethod SendById(aid: str, message: Any, timeout: int)[source]

Find a actor instance by id to send message.

Parameters
  • aid (str) – actor id in str

  • message (Any) – Message to send to the actor.

  • timeout (int) – Timeout of the sending action.

abstract classmethod SendRandom(message: Any, timeout)[source]

Send message to a random available actor.

Parameters
  • message (Any) – Message to send to the actor.

  • timeout (int) – Timeout of the sending action.

abstract classmethod Start(num: int, inbox_maxsize: int = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, rev_timeout: Optional[int] = None)[source]

Create and start a number of actor.

Parameters
  • num (int) – The number of actor to create and start.

  • inbox_maxsize (int, optional) – inbox’s Size. Defaults to 0.

  • loop (Optional[asyncio.events.AbstractEventLoop], optional) – Event loop which the actors running on. Defaults to None.

  • rev_timeout ([int], optional) – timeout for waiting for the recive function. Defaults to None.

abstract property aid

Get the id of the actor instance.

Returns

The id of the actor instance.

Return type

bool

abstract property available

Check if the actor instance is available.

Returns

if the actor instance is available.

Return type

bool

abstract async close(timeout: Optional[int] = None)[source]

Close the actor instance.

Parameters

timeout (Optional[int], optional) – timeout of closing action. Defaults to None.

abstract property inbox_maxsize

Max size of the actor instance’s message box.

Returns

The max size of the actor instance’s message box.

Return type

int

abstract property inbox_size

Size of the actor instance’s message box.

Returns

Size of the actor instance’s message box.

Return type

int

abstract property loop

The event loop tasks running on.

Returns

The event loop tasks running on.

Return type

asyncio.events.AbstractEventLoop

abstract property paused

Check if the actor instance is paused.

Returns

if the actor instance is paused.

Return type

bool

abstract async receive(message)[source]

Define in your subclass.

Parameters

message (Any) – Message to send to the actor.

abstract property running

Check if the actor instance is running.

Returns

If the actor instance is running..

Return type

bool

abstract async send(message, timeout=None)[source]

Send a message to the actor instance.

Parameters
  • message (Any) – Message to send to the actor.

  • timeout (int) – Timeout of the sending action.

abstract send_nowait(message)[source]

Send a message to the actor instance with no wait.

Parameters

message (Any) – Message to send to the actor.

abstract start()[source]

Start the actor instance.

abstract property task

Task running inside the actor instance.

Returns

The task running inside the actor instance.

Return type

asyncio.Task

Base Class

Async Actor Tool.

class aio_parallel_tools.aio_actor.actor_basic.AioActor(inbox_maxsize: int = 0, loop: Optional[asyncio.events.AbstractEventLoop] = None, rev_timeout: Optional[int] = None)[source]

Bases: aio_parallel_tools.aio_actor.mixins.manage_mixin.ManageMixin, aio_parallel_tools.aio_actor.mixins.inbox_mixin.InboxMixin, aio_parallel_tools.aio_actor.mixins.task_mixin.TaskMixin, aio_parallel_tools.aio_actor.mixins.hooks_mixin.HooksMixin, aio_parallel_tools.aio_actor.mixins.id_mixin.IdentifyMixin, aio_parallel_tools.aio_actor.mixins.loop_mixin.LoopMixin, aio_parallel_tools.aio_actor.actor_abc.ActorABC

Base Async Actor class.

To use the base class,we should create a sub class and write a implement of async method receive.

Usage:

>>> class Pinger(AioActor):
...     async def receive(self, message):
...         print(message)
...         try:
...             await ActorManager.get_actor("Ponger").Send('ping')
...         except Exception as e:
...             print(f"receive run error {e}")
...         finally:
...             await asyncio.sleep(0.5)
>>> class Ponger(AioActor):
...     async def receive(self, message):
...     print(message)
...     try:
...         await ActorManager.get_actor("Pinger").Send('pong')
...     except Exception as e:
...         print(f"receive run error {e}")
...     finally:
...         await asyncio.sleep(0.5)
>>> async def main():
        Pinger.Start(num=3)
        Ponger.Start(num=3)
        await asyncio.sleep(1)
        for i in Pinger.Members:
            print("****************")
            print(i.aid)
            print(i.available)
            print(i.running)
            print(i.paused)
            print("****************")
        await Pinger.Send("start")
        await asyncio.sleep(10)
        await Pinger.Close(num=3)
        await Ponger.Close(num=3)
Members = {}
property available

Check if the actor instance is available.

Returns

if the actor instance is available.

Return type

bool

async close(timeout: Optional[int] = None)[source]

Close the actor instance.

Parameters

timeout (Optional[int], optional) – timeout of closing action. Defaults to None.

close_nowait()[source]
start()[source]

Start the actor instance.

Actor Manager

Functions for managing Actors.

class aio_parallel_tools.aio_actor.actor_manager.ActorManagerRegister[source]

Bases: type

Meta class for regist subclass for management.

aio_parallel_tools.aio_actor.actor_manager.get_actor(actor_name: str) → Optional[aio_parallel_tools.aio_actor.actor_abc.ActorABC][source]

Get actor class by name.

Parameters

actor_name (str) – actor class’s name

Returns

actor class

Return type

Optional[ActorABC]

aio_parallel_tools.aio_actor.actor_manager.has_actor() → List[str][source]

Get all actor name.

Returns

all actor name.

Return type

List[str]