# leak_test_hmi_threaded.py import sys import os import json import struct import time import serial import serial.tools.list_ports import pandas as pd from PyQt6 import QtCore, QtGui, QtWidgets import pyqtgraph as pg # --------------------------- # Serial helper: auto-detect # --------------------------- def find_arduino_port(): ports = list(serial.tools.list_ports.comports()) for port in ports: desc = (port.description or "").lower() if any(kw in desc for kw in ("arduino", "ch340", "usb serial", "ftdi")): return port.device return None # --------------------------- # Config JSON helpers # --------------------------- CONFIG_FILE = "config.json" DEFAULT_CONFIG = { "setpoint_start": 25.0, "setpoint_end": 12.0, "setwindow": 900.0, "marker_interval": 20.0 } def load_config(): if not os.path.exists(CONFIG_FILE): return DEFAULT_CONFIG.copy() try: with open(CONFIG_FILE, "r") as f: cfg = json.load(f) # ensure defaults for missing keys for k, v in DEFAULT_CONFIG.items(): cfg.setdefault(k, v) return cfg except Exception: return DEFAULT_CONFIG.copy() def save_config(cfg: dict): with open(CONFIG_FILE, "w") as f: json.dump(cfg, f, indent=4) # --------------------------- # Serial reading thread # --------------------------- class SerialReaderThread(QtCore.QThread): # emits elapsed_time (s), eng_pressure (float) new_sample = QtCore.pyqtSignal(float, float) error = QtCore.pyqtSignal(str) def __init__(self, ser: serial.Serial | None, start_time: float, parent=None): super().__init__(parent) self.ser = ser self._running = False self.start_time = start_time def run(self): self._running = True # small sleep to avoid busy-wait burning CPU while self._running: try: if not self.ser: # no serial, sleep and continue self.msleep(200) continue # If binary 2-byte packets are expected, read when available if self.ser.in_waiting >= 2: data = self.ser.read(2) if len(data) < 2: continue try: raw = struct.unpack(' self.max_points: # keep only last max_points self.time_data = self.time_data[-self.max_points:] self.pressure_data = self.pressure_data[-self.max_points:] # update last value label immediately try: self.last_value_label.setText(f"{pressure:.2f} kPa") self.last_value_label.setPos(self.time_data[-1], self.pressure_data[-1]) except Exception: pass def _refresh_plot(self): self.time_window = 100.0 # seconds of visible data # update plot from latest buffers if not self.time_data: return try: # plot raw values self.plot.setData(self.time_data, self.pressure_data) if len(self.time_data) > 0: latest_time = self.time_data[-1] self.plot_widget.setXRange(latest_time - self.time_window, latest_time) except Exception as e: # ignore transient errors pass def _sync_cursors(self, moved_cursor, other_cursor): # keep the two cursors setwindow apart try: if moved_cursor is self.cursor_start: other_val = moved_cursor.value() + self.setwindow else: other_val = moved_cursor.value() - self.setwindow other_cursor.blockSignals(True) other_cursor.setValue(other_val) other_cursor.blockSignals(False) except Exception: pass # ----- update fields ----- def update_setpoint_start(self): try: self.setpoint_start = float(self.setpoint_start_input.text()) self.setpoint_start_line.setValue(self.setpoint_start) except ValueError: # ignore invalid pass def update_setpoint_end(self): try: self.setpoint_end = float(self.setpoint_end_input.text()) self.setpoint_end_line.setValue(self.setpoint_end) except ValueError: pass def update_setwindow(self): try: self.setwindow = float(self.setwindow_input.text()) # sync cursors self._sync_cursors(self.cursor_start, self.cursor_end) except ValueError: pass def update_marker_interval(self): try: val = float(self.marker_input.text()) if val <= 0: return self.marker_interval = val self.marker_timer.setInterval(int(self.marker_interval * 1000)) except ValueError: pass def add_marker(self): t = time.time() - self.start_time marker = pg.InfiniteLine(pos=t, angle=90, pen=pg.mkPen(color="m", width=1, style=pg.QtCore.Qt.PenStyle.DashLine)) self.plot_widget.addItem(marker) # ----- save / load config ----- def save_configuration(self): try: cfg = { "setpoint_start": float(self.setpoint_start_input.text()), "setpoint_end": float(self.setpoint_end_input.text()), "setwindow": float(self.setwindow_input.text()), "marker_interval": float(self.marker_input.text()) } save_config(cfg) # update current runtime values too self.setpoint_start = cfg["setpoint_start"] self.setpoint_end = cfg["setpoint_end"] self.setwindow = cfg["setwindow"] self.marker_interval = cfg["marker_interval"] # apply to UI self.setpoint_start_line.setValue(self.setpoint_start) self.setpoint_end_line.setValue(self.setpoint_end) self._sync_cursors(self.cursor_start, self.cursor_end) self.marker_timer.setInterval(int(self.marker_interval * 1000)) QtWidgets.QMessageBox.information(self, "Configuration", "Configuration saved.") except Exception as e: QtWidgets.QMessageBox.warning(self, "Configuration", f"Could not save configuration: {e}") # ----- analyze & save ----- def analyze_and_save(self): try: start_time = float(self.cursor_start.value()) end_time = float(self.cursor_end.value()) if start_time >= end_time: self.result_label.setText("Invalid dT") return # build marker times from start_time to end_time marker_times = [] t = start_time # avoid infinite loops if marker_interval invalid if self.marker_interval <= 0: self.result_label.setText("Invalid marker interval") return while t <= end_time: marker_times.append(t) t += self.marker_interval # find nearest sample for each marker sampled_times = [] sampled_pressures = [] for mt in marker_times: if not self.time_data: break idx = min(range(len(self.time_data)), key=lambda i: abs(self.time_data[i] - mt)) sampled_times.append(self.time_data[idx]) sampled_pressures.append(self.pressure_data[idx]) if not sampled_pressures: self.result_label.setText("No data at markers") return start_pressure = sampled_pressures[0] end_pressure = sampled_pressures[-1] pressure_drop = start_pressure - end_pressure dt = end_time - start_time drop_rate = pressure_drop / dt if dt != 0 else 0.0 threshold = (self.setpoint_start - self.setpoint_end) / dt if dt != 0 else 0.0 test_result = "PASS" if drop_rate <= threshold else "FAIL" self.result_label.setText(f"Test Result: {test_result} (Drop Rate: {drop_rate:.3f} kPa/s)") self.result_label.setStyleSheet("color: green;" if test_result == "PASS" else "color: red;") # timestamp timestamp = time.strftime("%Y-%m-%d %H:%M:%S") # detailed CSV (sampled points) rounded to 1 decimal detailed_df = pd.DataFrame({ "Time (s)": [round(t, 1) for t in sampled_times], "Pressure (kPa)": [round(p, 1) for p in sampled_pressures] }) detailed_filename = f"leak_test_data_ST{self.stack_input.text()}_{timestamp.replace(':', '-').replace(' ', '_')}.csv" detailed_df.to_csv(detailed_filename, index=False) # screenshot of whole window screenshot = self.grab() screenshot_filename = f"screenshot_{detailed_filename}.png" screenshot.save(screenshot_filename) # summary CSV (append) summary_df = pd.DataFrame([[ timestamp, round(start_pressure, 1), round(end_pressure, 1), round(drop_rate, 3), test_result, self.stack_input.text(), self.comments_input.text(), screenshot_filename ]], columns=["Time", "Start Pressure", "End Pressure", "Drop Rate (kPa/s)", "Result", "Stack", "Comments", "Screenshot"]) summary_file = "leak_test_summary.csv" header = not os.path.exists(summary_file) summary_df.to_csv(summary_file, mode="a", header=header, index=False) QtWidgets.QMessageBox.information(self, "Analysis", f"Saved summary and data:\n{summary_file}\n{detailed_filename}\n{screenshot_filename}") except Exception as e: self.result_label.setText("Invalid Criteria Input") print("analyze_and_save error:", e) # ----- cleanup ----- def closeEvent(self, event: QtGui.QCloseEvent): # stop serial thread try: if hasattr(self, "serial_thread") and self.serial_thread is not None: self.serial_thread.stop() except Exception: pass # close serial try: if self.ser and self.ser.is_open: self.ser.close() except Exception: pass # stop timers try: self._dt_timer.stop() self.marker_timer.stop() self.plot_refresh_timer.stop() except Exception: pass event.accept() # --------------------------- # run app # --------------------------- def main(): # find Arduino and open serial port = find_arduino_port() ser = None if port: try: ser = serial.Serial(port, 9600, timeout=0.1) # flush at start ser.reset_input_buffer() print("Opened serial", port) except Exception as e: print("Could not open serial:", e) ser = None else: print("Arduino port not found; running without serial input") app = QtWidgets.QApplication(sys.argv) win = HMI(ser) win.show() sys.exit(app.exec()) if __name__ == "__main__": main()