Skip to content

Instantly share code, notes, and snippets.

@fxprime
Created April 27, 2026 01:27
Show Gist options
  • Select an option

  • Save fxprime/ff5bb8c3756b9b4f2407158f2e592b60 to your computer and use it in GitHub Desktop.

Select an option

Save fxprime/ff5bb8c3756b9b4f2407158f2e592b60 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
RS-LDO-N01-1 Fluorescence Dissolved Oxygen Sensor
Raspberry Pi — USB to RS485 reader
Default: address 0x01, baud 4800
"""
import serial
import struct
import time
# ── Config ────────────────────────────────────────────────────────────────────
PORT = "/dev/ttyUSB0" # change to /dev/ttyUSB1 etc. if needed
BAUD = 4800
ADDRESS = 0x01
TIMEOUT = 2.0 # seconds
INTERVAL = 2.0 # polling interval
# ─────────────────────────────────────────────────────────────────────────────
def crc16(data: bytes) -> int:
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
def build_read_frame(address: int, start_reg: int, count: int) -> bytes:
frame = struct.pack(">BBHH", address, 0x03, start_reg, count)
crc = crc16(frame)
frame += struct.pack("<H", crc) # CRC is little-endian (low byte first)
return frame
def check_crc(data: bytes) -> bool:
if len(data) < 3:
return False
payload = data[:-2]
received = struct.unpack("<H", data[-2:])[0]
return crc16(payload) == received
def regs_to_float(high_reg: int, low_reg: int) -> float:
"""Big-endian 32-bit IEEE 754 float from two 16-bit registers."""
raw = struct.pack(">HH", high_reg, low_reg)
return struct.unpack(">f", raw)[0]
def read_sensor(ser: serial.Serial) -> dict | None:
# Read 6 registers starting at 0x0000:
# 0x0000-0x0001 → DO saturation (%)
# 0x0002-0x0003 → DO concentration (mg/L)
# 0x0004-0x0005 → Temperature (°C)
frame = build_read_frame(ADDRESS, 0x0000, 6)
ser.reset_input_buffer()
ser.write(frame)
# Expected reply: addr(1) + func(1) + byte_count(1) + 12 data bytes + CRC(2) = 17 bytes
reply = ser.read(17)
if len(reply) < 17:
print(f"[WARN] Short reply: got {len(reply)} bytes, expected 17")
return None
if not check_crc(reply):
print("[WARN] CRC mismatch")
return None
if reply[0] != ADDRESS or reply[1] != 0x03:
print(f"[WARN] Unexpected frame: addr=0x{reply[0]:02X} func=0x{reply[1]:02X}")
return None
# Parse 6 × 16-bit registers from byte index 3
regs = struct.unpack(">HHHHHH", reply[3:15])
do_sat = regs_to_float(regs[0], regs[1])
do_conc = regs_to_float(regs[2], regs[3])
temp_c = regs_to_float(regs[4], regs[5])
# Sanity check
if not (0.0 <= do_sat <= 200.0 and 0.0 <= do_conc <= 20.0 and 0.0 <= temp_c <= 40.0):
print(f"[WARN] Values out of range — sat={do_sat:.2f} conc={do_conc:.2f} temp={temp_c:.2f}")
return None
return {
"do_saturation_pct": round(do_sat, 2),
"do_concentration_mgL": round(do_conc, 2),
"temperature_c": round(temp_c, 2),
}
def main():
print(f"Opening {PORT} @ {BAUD} baud (address 0x{ADDRESS:02X})")
print("Press Ctrl+C to stop.\n")
with serial.Serial(
port = PORT,
baudrate = BAUD,
bytesize = serial.EIGHTBITS,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE,
timeout = TIMEOUT,
) as ser:
time.sleep(0.1) # let the port settle
while True:
data = read_sensor(ser)
if data:
print(
f"DO: {data['do_concentration_mgL']:5.2f} mg/L | "
f"Saturation: {data['do_saturation_pct']:6.2f} % | "
f"Temp: {data['temperature_c']:5.2f} °C"
)
else:
print("[ERROR] Failed to read sensor")
time.sleep(INTERVAL)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment