Source code for runtimepy.net.tcp.scpi
"""
A module implementing an SCPI interface.
"""
# built-in
import asyncio
# internal
from runtimepy.net.arbiter.tcp import TcpConnectionFactory
from runtimepy.net.tcp import TcpConnection
[docs]
class ScpiConnection(TcpConnection):
"""A simple SCPI connection class."""
[docs]
def init(self) -> None:
"""Initialize this instance."""
self.command_lock = asyncio.Lock()
self.message_queue: asyncio.Queue[str] = asyncio.Queue(maxsize=1)
[docs]
async def async_init(self) -> bool:
"""Initialize this instance."""
# Any SCPI device should respond to this query.
return bool(await self.send_command("*IDN", log=True, query=True))
[docs]
async def process_text(self, data: str) -> bool:
"""Process a text frame."""
for item in data.splitlines():
if item:
await self.message_queue.put(item)
return True
[docs]
async def process_binary(self, data: bytes) -> bool:
"""Process a binary frame."""
return await self.process_text(data.decode())
[docs]
async def send_command(
self,
command: str,
response: bool = True,
log: bool = False,
query: bool = False,
timeout: float = 1.0,
) -> str:
"""Send a command."""
result = ""
if query:
command += "?"
async with self.command_lock:
self.send_text(command + "\n")
if response or query:
try:
result = await asyncio.wait_for(
self.message_queue.get(), timeout
)
if log:
self.logger.info("(%s) %s", command, result)
except asyncio.TimeoutError:
self.logger.error(
"Peer didn't respond to '%s'! (timeout: %.2f)",
command,
timeout,
)
return result
[docs]
class ScpiConn(TcpConnectionFactory[ScpiConnection]):
"""A connection factory for SCPI devices."""
kind = ScpiConnection