Created
April 27, 2026 01:27
-
-
Save fxprime/ff5bb8c3756b9b4f2407158f2e592b60 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| RS-LDO-N01-1 Fluorescence Dissolved Oxygen Sensor | |
| Raspberry Pi — USB to RS485 reader | |
| Default: address 0x01, baud 4800 | |
| """ | |
| import serial | |
| import struct | |
| import time | |
| # ── Config ──────────────────────────────────────────────────────────────────── | |
| PORT = "/dev/ttyUSB0" # change to /dev/ttyUSB1 etc. if needed | |
| BAUD = 4800 | |
| ADDRESS = 0x01 | |
| TIMEOUT = 2.0 # seconds | |
| INTERVAL = 2.0 # polling interval | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def crc16(data: bytes) -> int: | |
| crc = 0xFFFF | |
| for byte in data: | |
| crc ^= byte | |
| for _ in range(8): | |
| if crc & 0x0001: | |
| crc = (crc >> 1) ^ 0xA001 | |
| else: | |
| crc >>= 1 | |
| return crc | |
| def build_read_frame(address: int, start_reg: int, count: int) -> bytes: | |
| frame = struct.pack(">BBHH", address, 0x03, start_reg, count) | |
| crc = crc16(frame) | |
| frame += struct.pack("<H", crc) # CRC is little-endian (low byte first) | |
| return frame | |
| def check_crc(data: bytes) -> bool: | |
| if len(data) < 3: | |
| return False | |
| payload = data[:-2] | |
| received = struct.unpack("<H", data[-2:])[0] | |
| return crc16(payload) == received | |
| def regs_to_float(high_reg: int, low_reg: int) -> float: | |
| """Big-endian 32-bit IEEE 754 float from two 16-bit registers.""" | |
| raw = struct.pack(">HH", high_reg, low_reg) | |
| return struct.unpack(">f", raw)[0] | |
| def read_sensor(ser: serial.Serial) -> dict | None: | |
| # Read 6 registers starting at 0x0000: | |
| # 0x0000-0x0001 → DO saturation (%) | |
| # 0x0002-0x0003 → DO concentration (mg/L) | |
| # 0x0004-0x0005 → Temperature (°C) | |
| frame = build_read_frame(ADDRESS, 0x0000, 6) | |
| ser.reset_input_buffer() | |
| ser.write(frame) | |
| # Expected reply: addr(1) + func(1) + byte_count(1) + 12 data bytes + CRC(2) = 17 bytes | |
| reply = ser.read(17) | |
| if len(reply) < 17: | |
| print(f"[WARN] Short reply: got {len(reply)} bytes, expected 17") | |
| return None | |
| if not check_crc(reply): | |
| print("[WARN] CRC mismatch") | |
| return None | |
| if reply[0] != ADDRESS or reply[1] != 0x03: | |
| print(f"[WARN] Unexpected frame: addr=0x{reply[0]:02X} func=0x{reply[1]:02X}") | |
| return None | |
| # Parse 6 × 16-bit registers from byte index 3 | |
| regs = struct.unpack(">HHHHHH", reply[3:15]) | |
| do_sat = regs_to_float(regs[0], regs[1]) | |
| do_conc = regs_to_float(regs[2], regs[3]) | |
| temp_c = regs_to_float(regs[4], regs[5]) | |
| # Sanity check | |
| if not (0.0 <= do_sat <= 200.0 and 0.0 <= do_conc <= 20.0 and 0.0 <= temp_c <= 40.0): | |
| print(f"[WARN] Values out of range — sat={do_sat:.2f} conc={do_conc:.2f} temp={temp_c:.2f}") | |
| return None | |
| return { | |
| "do_saturation_pct": round(do_sat, 2), | |
| "do_concentration_mgL": round(do_conc, 2), | |
| "temperature_c": round(temp_c, 2), | |
| } | |
| def main(): | |
| print(f"Opening {PORT} @ {BAUD} baud (address 0x{ADDRESS:02X})") | |
| print("Press Ctrl+C to stop.\n") | |
| with serial.Serial( | |
| port = PORT, | |
| baudrate = BAUD, | |
| bytesize = serial.EIGHTBITS, | |
| parity = serial.PARITY_NONE, | |
| stopbits = serial.STOPBITS_ONE, | |
| timeout = TIMEOUT, | |
| ) as ser: | |
| time.sleep(0.1) # let the port settle | |
| while True: | |
| data = read_sensor(ser) | |
| if data: | |
| print( | |
| f"DO: {data['do_concentration_mgL']:5.2f} mg/L | " | |
| f"Saturation: {data['do_saturation_pct']:6.2f} % | " | |
| f"Temp: {data['temperature_c']:5.2f} °C" | |
| ) | |
| else: | |
| print("[ERROR] Failed to read sensor") | |
| time.sleep(INTERVAL) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment