Source code for runtimepy.net.server.app.env.tab.message

"""
A module implementing a channel-environment tab message-handling interface.
"""

# built-in
import logging
from typing import Any, Callable

# internal
from runtimepy.channel import Channel
from runtimepy.message import JsonMessage
from runtimepy.net.server.app.env.tab.base import ChannelEnvironmentTabBase
from runtimepy.net.server.websocket.state import TabState

TabMessageSender = Callable[[JsonMessage], None]


[docs] class ChannelEnvironmentTabMessaging(ChannelEnvironmentTabBase): """A channel-environment tab interface.""" def _setup_callback(self, name: str, state: TabState) -> None: """Register a channel's value-change callback.""" chan = self.command.env.field_or_channel(name) assert isinstance(chan, Channel) or chan is not None prim = chan.raw def callback(_, __) -> None: """Emit a change event to the stream.""" # Render enumerations etc. here instead of trying to do it # in the UI. state.points[name].append( (self.command.env.value(name), prim.last_updated_ns) ) state.primitives[name] = prim state.callbacks[name] = prim.register_callback(callback)
[docs] def handle_shown_state( self, shown: bool, outbox: JsonMessage, send: TabMessageSender, state: TabState, ) -> None: """Handle 'shown' state changing.""" state.shown = shown env = self.command.env # Always sample current values. latest = env.values() if state.shown: # Send missing or changed values. if latest != state.latest_ui_values: to_send = {} for key, value in latest.items(): if ( key not in state.latest_ui_values or state.latest_ui_values[key] != value ): to_send[key] = value send(to_send) # type: ignore # Begin observing channel events for this environment. for name in env.names: self._setup_callback(name, state) else: state.clear_telemetry() # Save current UI state. state.latest_ui_values.update(latest) outbox["handle_shown_state"] = shown
[docs] def handle_init(self, state: TabState) -> None: """Handle tab initialization.""" # Initialize logging. if isinstance(self.logger, logging.Logger): state.add_logger(self.logger) self.logger.debug("Tab initialized.")
[docs] async def handle_message( self, data: dict[str, Any], send: TabMessageSender, state: TabState ) -> JsonMessage: """Handle a message from a tab.""" kind: str = data["kind"] response: JsonMessage = {} # Respond to initialization. if kind == "init": self.handle_init(state) # Handle command-line commands. elif kind == "command": cmd = self.command result = cmd.command(data["value"]) # Limit log spam. self.governed_log( self.log_limiter, "%s: %s", data["value"], result, level=logging.INFO if result else logging.ERROR, ) # Handle tab-event messages. elif kind.startswith("tab"): if "shown" in kind: self.handle_shown_state(True, response, send, state) elif "hidden" in kind: self.handle_shown_state(False, response, send, state) # Log when messages aren't handled. else: self.governed_log( self.log_limiter, "(%s) Message not handled: '%s'.", self.name, data, level=logging.WARNING, ) return response