-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulator.py
More file actions
148 lines (121 loc) · 4.41 KB
/
simulator.py
File metadata and controls
148 lines (121 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
"""
HumaCount 5D Analyzer Simulator
Sends HL7-formatted FBC/Blood Picture results to the HumaCount5D middleware
via TCP using MLLP (Minimal Lower Layer Protocol) framing.
"""
import argparse
import json
import socket
import sys
from datetime import datetime
# MLLP framing characters
VT = 0x0B # Vertical Tab - Start Block
FS = 0x1C # File Separator - End Block
CR = 0x0D # Carriage Return
# Map test codes to their HL7 descriptions
TEST_DESCRIPTIONS = {
"WBC": "White Blood Cells",
"RBC": "Red Blood Cells",
"HGB": "Hemoglobin",
"HCT": "Hematocrit",
"MCV": "Mean Corpuscular Volume",
"MCH": "Mean Corpuscular Hemoglobin",
"MCHC": "Mean Corpuscular Hemoglobin Concentration",
"PLT": "Platelets",
"RDW-CV": "Red Cell Distribution Width CV",
"RDW-SD": "Red Cell Distribution Width SD",
"NEU%": "Neutrophils Percent",
"LYM%": "Lymphocytes Percent",
"MON%": "Monocytes Percent",
"EOS%": "Eosinophils Percent",
"BAS%": "Basophils Percent",
"NEU#": "Neutrophils Absolute",
"LYM#": "Lymphocytes Absolute",
"MON#": "Monocytes Absolute",
"EOS#": "Eosinophils Absolute",
"BAS#": "Basophils Absolute",
}
def load_config(path):
with open(path, "r") as f:
return json.load(f)
def build_hl7_message(sample_id, results):
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
segments = []
# MSH - Message Header
segments.append(
f"MSH|^~\\&|HumaCount5D|LAB|||{timestamp}||ORU^R01|MSG001|P|2.3.1"
)
# OBR - Observation Request
segments.append(
f"OBR|1||{sample_id}|CBC|||{timestamp}"
)
# OBX - Observation Result segments
seq = 1
for test_code, test_data in results.items():
value = test_data["value"]
unit = test_data["unit"]
description = TEST_DESCRIPTIONS.get(test_code, test_code)
segments.append(
f"OBX|{seq}|NM|{description}^{test_code}||{value}|{unit}||||F"
)
seq += 1
return "\r".join(segments) + "\r"
def wrap_mllp(message):
return bytes([VT]) + message.encode("ascii") + bytes([FS, CR])
def send_to_middleware(host, port, mllp_message):
print(f"Connecting to {host}:{port}...")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(10)
sock.connect((host, port))
print(f"Connected. Sending {len(mllp_message)} bytes...")
sock.sendall(mllp_message)
print("Message sent.")
# Try to read ACK response (middleware may or may not send one)
try:
response = sock.recv(4096)
if response:
# Strip MLLP framing for display
display = response.decode("ascii", errors="replace").strip(
chr(VT) + chr(FS) + chr(CR)
)
print(f"Response received:\n{display}")
else:
print("Connection closed by middleware (no ACK).")
except socket.timeout:
print("No response received (timeout). Message was sent successfully.")
def main():
parser = argparse.ArgumentParser(
description="HumaCount 5D Analyzer Simulator - sends HL7 FBC results to middleware"
)
parser.add_argument(
"--config", default="config.json",
help="Path to config JSON file (default: config.json)"
)
parser.add_argument("--host", help="Override middleware host")
parser.add_argument("--port", type=int, help="Override middleware port")
parser.add_argument("--sample-id", help="Override sample ID")
args = parser.parse_args()
# Load config
try:
config = load_config(args.config)
except FileNotFoundError:
print(f"Error: Config file '{args.config}' not found.", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in '{args.config}': {e}", file=sys.stderr)
sys.exit(1)
# Apply CLI overrides
host = args.host or config["connection"]["host"]
port = args.port or config["connection"]["port"]
sample_id = args.sample_id or config["sample_id"]
results = config["results"]
# Build and send
print(f"Sample ID: {sample_id}")
print(f"Tests: {len(results)}")
hl7_message = build_hl7_message(sample_id, results)
print(f"\nHL7 Message:\n{hl7_message}")
mllp_message = wrap_mllp(hl7_message)
send_to_middleware(host, port, mllp_message)
if __name__ == "__main__":
main()