-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemulatorStart.py
executable file
·124 lines (108 loc) · 3.43 KB
/
emulatorStart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/usr/bin/env python3
import sys
import pprint
from utils.SerialUtils import *
from utils.Protocol import *
from utils.Commands import *
DC = dc()
GT = DC.getTime
ST = DC.setTime
GH = DC.returnHASH
SM = DC.setMode
GM = DC.getMode
GC = DC.getChannels
SC = DC.setChannels
GA = DC.getTemperature
GS = DC.getFan
GL = DC.getClockCalibration
GP = DC.getPWMDivider
COMMANDS = {'GT': GT,
'GV': GV,
'GC': GC,
'GH': GH,
'SM': SM,
'GM': GM,
'GL': GL,
'GA': GA,
'GS': GS,
'SC': SC,
'ST': ST,
'GL': GL,
'GP': GP
}
def messCrcCheck(mess):
if type(mess) == bytes:
return CRC16(byteToStr(mess[:-2])) == mess[-2:]
else:
return CRC16(mess[:-2]) == mess[-2:]
def Reader(connect):
mess = b''
while True:
b = connect.read()
if b in [b'$', b'\x01']:
mess += b
while b not in map(bytes, [[5], [2]]):
b = connect.read()
mess += b
mess += connect.read(2)
break
return mess
def CycleReader(connect):
mess = b''
b = connect.read()
if b in map(bytes, [[4], [3]]):
return b
while b not in map(bytes, [[4], [3], [0x1e]]):
b = connect.read()
mess += b
mess += connect.read(2)
return mess
def Handler(mess, connect):
logger.debug(f"Incomming message: {mess}")
if messCrcCheck(mess):
if b'\x24' in mess:
req = byteToStr(mess[mess.index(b'\x24') + 1:mess.index(b'\x24') + 3])
COMMANDS[req](connect=connect, mess=mess)
logger.debug(f"Incomming Request: {req}, mess= {mess}")
elif mess[0] in [1, 3, 4]:
logger.info(f"Start write cycle {mess}")
Write(ACK, connect)
cycle = []
while True:
point = CycleReader(connect)
if (US in byteToStr(point)) and (RS in byteToStr(point)):
if messCrcCheck(point):
parse_point = [i for i in byteToStr(point).split(RS)[0].split(US)]
parse_point.append(HoursToMin(parse_point.pop(0)))
parse_point = [int(i) for i in parse_point]
cycle.append(parse_point)
Write(ACK, connect)
else:
Write(NAK, connect)
logger.error(f"Point incorrect!: {point}. sending NAK")
elif len(point) and point[0] == 3:
DC.cycle = cycle
DC.setHash(DC.HashCalc(cycle=cycle))
logger.success(f"New cycle: \n{pprint.pformat(cycle)}")
logger.info(f"New cycle HASH = {DC.HASH}")
Write(ACK, connect)
elif len(point) and point[0] == 4:
logger.success(f"End of writing cycle. Sending ACK.")
Write(ACK, connect)
break
port = 'COM6'
if (len(sys.argv) > 1):
port = sys.argv[1]
logger.info(f'Listening on device: {port}')
timeout = 0.01
baudrate = 9600
with serial.Serial(port=port, baudrate=baudrate, timeout=timeout) as connect:
n = 0
try:
while True:
mess = Reader(connect)
logger.debug(f'[{n}] {mess}')
Handler(mess, connect)
n += 1
except KeyboardInterrupt:
logger.info(f"Exit!")