Skip to main content

Python

Full example for a direct connection to LA4 at 192.168.0.23:5000. A background thread listens for incoming events, a background thread sends periodic heartbeats, and the main thread sends commands.

note

This example assumes LA4 is reachable at a known static IP. If you are not using a direct or static IP setup, you will need to scan the local network to discover LA4's address before connecting. See the network setup section for details.

import socket
import json
import threading
import time
import uuid


class LA4Client:
LA4_IP = "192.168.0.23"
LA4_PORT = 5000

MY_IP = "192.168.0.100" # IP of this machine — LA4 will reply here
MY_PORT = 5000 # Port this machine listens on

HEARTBEAT_FREQUENCY_SECONDS = 3
SEND_DATA_TIMEOUT_SECONDS = 2

SENDER_NAME = "My PC App"
SENDER_VERSION = "0.1.0"

def __init__(self):
self._sender_id = uuid.uuid4().hex[:8]
self._running = True

def run(self) -> None:
threading.Thread(target=self._listener, daemon=True).start()
threading.Thread(target=self._send_heartbeats, daemon=True).start()

# Wait for listener to start before sending commands
time.sleep(0.5)

self.turn_on_model_light()

input("Press Enter to exit...\n")
self._running = False

def _send(self, payload: dict) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(self.SEND_DATA_TIMEOUT_SECONDS)
s.connect((self.LA4_IP, self.LA4_PORT))
s.sendall(json.dumps(payload).encode("utf-8"))

def _base_payload(self) -> dict:
return {
"id": self._sender_id,
"name": self.SENDER_NAME,
"ip": self.MY_IP,
"port": self.MY_PORT,
"version": self.SENDER_VERSION,
"coordinator": True,
}

def _send_command(self, command: str) -> None:
payload = self._base_payload()
payload["type_"] = "cmd"
payload["command"] = command
self._send(payload)

def turn_on_model_light(self) -> None:
sequence = "LED_FAN,1\\nMODEL_LIGHT,80"
command = (
f'echo "{sequence}" > /home/rigsters/la4_sequence.csv'
" && boardiocli uSeq && boardiocli runSeq"
)
self._send_command(command)

def _send_heartbeats(self) -> None:
while self._running:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(self.SEND_DATA_TIMEOUT_SECONDS)
s.connect((self.LA4_IP, self.LA4_PORT))
heartbeat = self._base_payload()
s.sendall(json.dumps(heartbeat).encode("utf-8"))
except (socket.timeout, ConnectionRefusedError):
pass
except Exception as e:
print(f"Cannot send heartbeat to LA4 - {e}")

time.sleep(self.HEARTBEAT_FREQUENCY_SECONDS)

def _listener(self) -> None:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(("0.0.0.0", self.MY_PORT))
server.listen()
print(f"Listening on port {self.MY_PORT}...")

while True:
conn, _ = server.accept()
with conn:
data = b""
while chunk := conn.recv(4096):
data += chunk

if not data:
continue

try:
event = json.loads(data.decode("utf-8"))
except json.JSONDecodeError:
continue

event_type = event.get("type_")
name = event.get("name", "unknown")

if event_type == "output":
print(f"[{name}] output: {event['output']}")
elif event_type == "message":
print(f"[{name}] message: {event['message']}")
elif event_type == "unit_info":
print(
f"[{name}] unit info — firmware: {event['firmware_version']}, "
f"boardio: {event['boardio_version']}, "
f"unit type: {event['unit_type']}"
)


if __name__ == "__main__":
LA4Client().run()