Source code for interactive_python.state
import collections
import asyncio
from pyee import EventEmitter
from .connection import Connection
from .discovery import Discovery
from .scene import Scene
[docs]class State(EventEmitter):
"""State is the state container for a single interactive session.
It should usually be created via the static
:func:`~interactive_python.State.connect` method::
connection = State.connect(
project_version_id=my_version_id,
authorization="Bearer " + oauth_token)
The Scene is a pyee.EventEmitter. When calls come down, they're always
emitted on the State by their method name. So, for instance, you can
listen to "onSceneCreate" or "onParticipantJoin" on the scene::
def greet(call):
for participant in call.data['participants']:
print('Welcome {}!', participant['username'])
scene.on('onParticipantJoin', greet)
The state can work in two modes for handling delivery of events and updates.
You can use `pump()` calls synchronously within your game loop to apply
updates that have been queued. Alternately, you can call ``pump_async()`` to
signal to that state that you want updates delivered asynchronously, as soon
as they come in. For example::
# Async delivery. `giveInput` is emitted as soon as any input comes in.
state.on('giveInput', lambda call: do_the_thing(call))
state.pump_async()
# Sync delivery. `giveInput` is emitted only during calls to pump()
state.on('giveInput', lambda call: do_the_thing(call))
while True:
my_game_loop.tick()
state.pump()
# You can also read queues of changes from pump(), if you prefer
# to dispatch changes manually:
# for call in pump(): ...
In both modes, all incoming call are emitted as events on the State
instance.
:param connection: The websocket connection to interactive.
:type connection: Connection
"""
def __init__(self, connection):
super(State, self).__init__()
self._scenes = {'default': Scene('default')}
self.connection = connection
self._enable_event_queue = True
self._event_queue = collections.deque()
self._scenes['default']._attach_connection(self.connection)
self.on('onSceneCreate', self._on_scene_create_or_update)
self.on('onSceneUpdate', self._on_scene_create_or_update)
self.on('onSceneDelete', self._on_scene_delete)
self.on('onControlCreate', self._on_control_update_or_create)
self.on('onControlUpdate', self._on_control_update_or_create)
self.on('onControlDelete', self._on_control_delete)
self.on('giveInput', self._give_input)
[docs] def scene(self, name):
"""
Looks up an existing scene by ID. It returns None if the scene does
not exist.
:param name: The name of the scene to look up
:type name: str
:return:
:rtype: Scene
"""
return self._scenes.get(name, None)
[docs] def pump_async(self, loop=asyncio.get_event_loop()):
"""
Starts a pump() process working in the background. Events will be
dispatched asynchronously.
Returns a future that can be used for cancelling the pump, if desired.
Otherwise the pump will automatically stop once
the connection is closed.
:rtype: asyncio.Future
"""
self._enable_event_queue = False
async def run():
try:
while await self.connection.has_packet():
self.pump()
except asyncio.CancelledError:
self._enable_event_queue = True
except Exception as e:
self.emit('error', e)
return asyncio.ensure_future(run(), loop=loop)
[docs] def pump(self):
"""
pump causes the state to read any updates it has queued up. This
should usually be called at the start of any game loop where you're
going to be doing processing of Interactive events.
Any events that have not been read when pump() is called are discarded.
Alternately, you can call pump_async() to have delivery handled for you
without manual input.
:rtype: Iterator of Calls
"""
self._event_queue.clear()
while True:
call = self.connection.get_packet()
if call is None:
return
self.emit(call.name, call)
if self._enable_event_queue:
self._event_queue.append(call)
return self._event_queue
[docs] async def create_scenes(self, *scenes):
"""
Can be called with one or more Scenes to add them to Interactive.
:param scenes: list of scenes to create
:type scenes: Scene
"""
for scene in scenes:
self._scenes[scene.id] = scene
scene._attach_connection(self)
return await self.connection.call(
'createScenes', [s._resolve_all() for s in scenes])
[docs] async def set_ready(self, is_ready=True):
"""
Marks the interactive integration as being ready-to-go. Must be called
before controls will appear.
:param is_ready: True or False to allow input
:rtype: Reply
"""
return await self.connection.call('ready', {'isReady': is_ready})
def _give_input(self, call):
control_id = call.data['input']['controlID']
for scene in self._scenes.values():
if control_id in scene.controls:
scene.controls[control_id]._give_input(call)
break
def _on_scene_delete(self, call):
if call.data['sceneID'] not in self._scenes:
return
self.scenes[call.data['sceneID']].delete(call)
del self.scenes[call.data['sceneID']]
def _on_scene_create_or_update(self, call):
for scene in call.data.scenes:
if scene['sceneID'] not in self._scenes:
self._scenes[scene['sceneID']] = Scene(self, scene['sceneID'])
self._scenes[scene['sceneID']]._apply_changes(scene, call)
def _on_control_delete(self, call):
if call.data['sceneID'] in self._scenes:
self._scenes[call.data['sceneID']]._on_control_delete(call)
def _on_control_update_or_create(self, call):
if call.data['sceneID'] in self._scenes:
self._scenes[call.data['sceneID']].\
_on_control_update_or_create(call)
[docs] @staticmethod
async def connect(discovery=Discovery(), **kwargs):
"""
Creates a new interactive connection. Most arguments will be passed
through into the Connection constructor.
:param discovery:
:type discovery: Discovery
:param kwargs:
:rtype: State
"""
if 'address' not in kwargs:
kwargs['address'] = await discovery.find()
connection = Connection(**kwargs)
await connection.connect()
return State(connection)