Skip to content

Commit

Permalink
Fix bug with non terminated line in um98x buffer input.
Browse files Browse the repository at this point in the history
Faster detection and configuration
  • Loading branch information
Stefal committed Dec 19, 2024
1 parent 06b057e commit 823d0dc
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 58 deletions.
6 changes: 3 additions & 3 deletions tools/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ detect_gnss() {
detected_gnss[2]=$port_speed
#echo 'U-blox ZED-F9P DETECTED ON '$port $port_speed
break
elif { model=$(python3 "${rtkbase_path}"/tools/unicore_tool.py --port /dev/$port --baudrate $port_speed --command get_model --retry 2 2>/dev/null) ; [[ "${model}" == 'UM98'[0-2] ]] ;}; then
elif { model=$(python3 "${rtkbase_path}"/tools/unicore_tool.py --port /dev/$port --baudrate $port_speed --command get_model 2>/dev/null) ; [[ "${model}" == 'UM98'[0-2] ]] ;}; then
detected_gnss[0]=$port
detected_gnss[1]='unicore'
detected_gnss[2]=$port_speed
Expand Down Expand Up @@ -549,10 +549,10 @@ configure_gnss(){
return $?
fi

elif { model=$(python3 "${rtkbase_path}"/tools/unicore_tool.py --port /dev/${com_port} --baudrate ${com_port_settings%%:*} --command get_model --retry 2 2>/dev/null) ; [[ "${model}" == 'UM98'[0-2] ]] ;}
elif { model=$(python3 "${rtkbase_path}"/tools/unicore_tool.py --port /dev/${com_port} --baudrate ${com_port_settings%%:*} --command get_model 2>/dev/null) ; [[ "${model}" == 'UM98'[0-2] ]] ;}
then
#get UM98x firmware release
firmware="$(python3 "${rtkbase_path}"/tools/unicore_tool.py --port /dev/${com_port} --baudrate ${com_port_settings%%:*} --command get_firmware --retry 2 2>/dev/null)" || firmware='?'
firmware="$(python3 "${rtkbase_path}"/tools/unicore_tool.py --port /dev/${com_port} --baudrate ${com_port_settings%%:*} --command get_firmware 2>/dev/null)" || firmware='?'
echo 'Unicore-' "${model}" 'Firmware: ' "${firmware}"
sudo -u "${RTKBASE_USER}" sed -i s/^receiver_firmware=.*/receiver_firmware=\'${firmware}\'/ "${rtkbase_path}"/settings.conf
#configure the UM980/UM982 for RTKBase
Expand Down
22 changes: 18 additions & 4 deletions tools/unicore/unicore_gnss/serial_comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ def __init__(
port=address,
baudrate=baudrate,
timeout=timeout,
write_timeout=write_timeout
write_timeout=write_timeout,
bytesize = serial.EIGHTBITS,
parity = serial.PARITY_NONE,
stopbits = serial.STOPBITS_ONE,
xonxoff = False,
rtscts = False,
dsrdtr = False,
)

def send(self, cmd) -> str or None:
Expand All @@ -35,15 +41,23 @@ def send_raw(self, cmd):
def read_lines(self) -> list:
read = self.device_serial.readlines()
for i, line in enumerate(read):
read[i] = line.decode(self.byte_encoding).strip()
read[i] = line.decode(self.byte_encoding, errors='ignore').strip()
return read

def read_until(self, expected='\r\n') -> list:
read = self.device_serial.read_until(expected = expected.encode())
read = read.decode(self.byte_encoding).strip().splitlines()
read = read.decode(self.byte_encoding, errors='ignore').strip().splitlines()
read = [ val for val in read if val != '']
return read


def read_until_line(self, expected='\r\n', eol='\r\n') -> str:
#while True:
read_start = self.device_serial.read_until(expected = expected.encode())
read_start = read_start.decode(self.byte_encoding, errors='ignore').strip().splitlines()[-1]
if expected in read_start:
read_end = self.device_serial.readline().decode(self.byte_encoding, errors='ignore')
return read_start + read_end

def read_raw(self, size: int):
return self.device_serial.read(size)

Expand Down
3 changes: 2 additions & 1 deletion tools/unicore/unicore_gnss/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ def test_expected_res():

def test_cmd_with_checksum():
c = object.__new__(UnicoGnss)
assert c._cmd_with_checksum('VERSIONA') == '$VERSIONA*1B'
assert c._cmd_with_checksum('VERSIONA') == '$VERSIONA*1B'
assert c._cmd_with_checksum('SAVECONFIG') == '$SAVECONFIG*0B'

103 changes: 54 additions & 49 deletions tools/unicore/unicore_gnss/unicore_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(
self,
address,
baudrate=115200,
timeout=4,
timeout=2,
cmd_delay=0.1,
debug=False,
):
Expand All @@ -40,14 +40,12 @@ def connect(self) -> None:
'''
Connect to the Unicore receiver
'''
if not self.is_open:
self.comm.device_serial.open()
#self.comm.send(self._cmd_with_checksum('UNLOG'))
#time.sleep(1)
#self.comm.device_serial.reset_input_buffer()
log.debug("Connecting...")
try:
read = self.send_read_raw(self._cmd_with_checksum('VERSIONA'), size=5000)
if self._expected_res_for('VERSIONA') in read.decode(encoding='ASCII', errors='ignore'):
# Send a line return to clear the Unicore input buffer
# It was a problem when the ubxtool was sending some commands without LF just before the unicore detection script.
self.comm.send('\r\n')
if self.get_receiver_model():
#print("GNSS receiver connected")
return
else:
Expand All @@ -58,7 +56,7 @@ def connect(self) -> None:
except Exception as e:
log.warning("GNSS receiver did not respond correctly. Closing serial port.")
log.warning(e)
log.debug(read)
#log.debug(read)
self.close()

def close(self) -> None:
Expand All @@ -76,34 +74,33 @@ def get_receiver_model(self) -> str:
'''
Get the Unicore receiver model (UM980, UM982, ....)
'''
read = self.send_read_raw(self._cmd_with_checksum('VERSIONA'), size=5000)
log.debug("Get receiver model")
read = self.send_read_until(self._cmd_with_checksum('VERSIONA'), expected=self._expected_res_for('VERSIONA'))
#'$command,VERSIONA,response: OK*45\r\n
# #VERSIONA,97,GPS,FINE,2343,48235000,0,0,18,813;"UM980","R4.10Build11833","HRPT00-S10C-P",
# "2310415000001-MD22A2225022497","ff3ba396dcb0478d","2023/11/24"*270a13f7\r\n'
log.debug("Receive: {}".format(read))
if self._expected_res_for('VERSIONA') in read.decode(encoding='ASCII', errors='ignore'):
lines = (chain.from_iterable(x.splitlines() for x in read.decode(encoding='ASCII', errors='ignore').split('#')))
for line in lines:
if line.startswith('VERSIONA'):
model = line.replace("'", "").split(";")[-1].split(",")[0].replace('"','')
return model
log.debug("Receive ack: {}".format(read))
read = self.comm.read_until_line(expected='#VERSIONA')
log.debug("Received answer: {}".format(read))
if '#VERSIONA' in read:
model = read.replace("'", "").split(";")[-1].split(",")[0].replace('"','')
return model
return None

def get_receiver_firmware(self) -> str:
'''
Get the Unicore receiver firmware version
'''
read = self.send_read_raw(self._cmd_with_checksum('VERSIONA'), size=5000)
read = self.send_read_until(self._cmd_with_checksum('VERSIONA'), expected=self._expected_res_for('VERSIONA'))
#'$command,VERSIONA,response: OK*45\r\n
# #VERSIONA,97,GPS,FINE,2343,48235000,0,0,18,813;"UM980","R4.10Build11833","HRPT00-S10C-P",
# "2310415000001-MD22A2225022497","ff3ba396dcb0478d","2023/11/24"*270a13f7\r\n'
log.debug("Receive: {}".format(read))
if self._expected_res_for('VERSIONA') in read.decode(encoding='ASCII', errors='ignore'):
lines = (chain.from_iterable(x.splitlines() for x in read.decode(encoding='ASCII', errors='ignore').split('#')))
for line in lines:
if line.startswith('VERSIONA'):
firmware = line.replace("'", "").split(";")[-1].split(",")[1].replace('"','')
return firmware
log.debug("Received ack: {}".format(read))
read = self.comm.read_until_line(expected='#VERSIONA')
log.debug("Received answer: {}".format(read))
if '#VERSIONA' in read:
firmware = read.replace("'", "").split(";")[-1].split(",")[1].replace('"','')
return firmware
return None

def set_factory_default(self) -> None:
Expand All @@ -112,9 +109,9 @@ def set_factory_default(self) -> None:
Connection will be closed
'''
log.debug("Sending: 'FRESET'")
read = self.send_read_raw(self._cmd_with_checksum('FRESET'), size=5000)
log.debug("Receiving: {}".format(read))
if self._expected_res_for('FRESET') not in read.decode(encoding='ASCII', errors='ignore'):
read = self.send_read_until(self._cmd_with_checksum('FRESET'), expected='#FRESET')
log.debug("Receiving ack: {}".format(read))
if self._expected_res_for('FRESET') not in read:
raise Exception("Command failed! {}".format(read))
self.close()
print("Resetting receiver. Connection closed")
Expand All @@ -133,8 +130,11 @@ def send_config_file(self, file, perm=False) -> None:
for line in f:
if line.strip() != '' and not line.startswith('#'):
log.debug("Sending cmd: {}".format(line.strip()))
self.send_read_raw(line.strip())
read = self.send_read_until(self._cmd_with_checksum(line.strip()), expected=self._expected_res_for(line.strip()))
log.debug("Received answer: {}".format(read))
#self.send_read_raw(line.strip())
if any(x in line for x in self.RESET_REQUIRED):
log.info("Rebooting. Waiting for 10s...")
time.sleep(10)
if perm:
self.set_config_permanent()
Expand All @@ -143,28 +143,31 @@ def set_config_permanent(self) -> None:
'''
Save current settings to boot config
'''
log.debug("Sending: '$SAVECONFIG*0b'")
read = self.send_read_raw('$SAVECONFIG*0b')
if '$command,SAVECONFIG,response: OK*55' not in read.decode(encoding='ASCII', errors='ignore'):
log.debug("Sending: 'SAVECONFIG'")
read = self.send_read_until(self._cmd_with_checksum('SAVECONFIG'), expected=self._expected_res_for('SAVECONFIG'))
log.debug("Received answer: {}".format(read))
if self._expected_res_for('SAVECONFIG') not in read[-1]:
raise Exception("Command failed! {}".format(read))
print("Settings saved")

def get_agc_values(self) -> list:
'''
Get the automatic gain control values
'''
read = self.send_read_raw(self._cmd_with_checksum('AGCA 1'))
if self._expected_res_for('AGCA 1') in read.decode(encoding='ASCII', errors='ignore'):
lines = (chain.from_iterable(x.splitlines() for x in read.decode(encoding='ASCII', errors='ignore').split('#')))
for line in lines:
if line.startswith('AGCA'):
#AGCA,65,GPS,FINE,2190,375570000,0,0,18,37;44,46,63,-1,-1,41,1,0,-1,-1*634f1e4b
#Antenna 1 values: 44,46,63
#Antenna 2 values: 41,1,0
values_list = line.replace("'", "").split(";")[-1].split(",")
agc_values = values_list[:3] + values_list[5:8]
agc_values = [ int(x) for x in agc_values]
return agc_values
read = self.send_read_until(self._cmd_with_checksum('AGCA '), expected=self._expected_res_for('AGCA '))
log.debug("Receive ack: {}".format(read))
# $command,AGCA,response: OK*5A
# #AGCA,97,GPS,FINE,2345,328881000,0,0,18,22;40,76,65,-1,-1,-1,-1,-1,-1,-1*8c8123ba
read = self.comm.read_until_line(expected='#AGCA')
log.debug("Received answer: {}".format(read))
if '#AGCA' in read:
#AGCA,65,GPS,FINE,2190,375570000,0,0,18,37;44,46,63,-1,-1,41,1,0,-1,-1*634f1e4b
#Antenna 1 values: 44,46,63
#Antenna 2 values: 41,1,0
values_list = read.replace("'", "").split(";")[-1].split(",")
agc_values = values_list[:3] + values_list[5:8]
agc_values = [ int(x) for x in agc_values]
return agc_values
else:
raise Exception("Command failed! {}".format(read))

Expand All @@ -177,7 +180,7 @@ def get_agc_status(self) -> list:
agc_values = self.get_agc_values()
agc_status = []
for i, value in enumerate(agc_values):
if value >= 0 and value < 10:
if value >= 0 and value < 10: # TODO : check real values to set the limits
agc_status.append('good')
elif value >= 10:
agc_status.append('bad')
Expand All @@ -187,16 +190,18 @@ def get_agc_status(self) -> list:

def send_read_lines(self, cmd, *args) -> list:
log.debug("Sending: {}{}{}".format(cmd, ' ' if args else '', ' '.join(args)))
self.comm.device_serial.reset_input_buffer()
self.comm.send(cmd)
read = self.comm.read_lines()
log.debug("Receiving: {}".format(read))
return read

def send_read_until(self, cmd, *args) -> list:
def send_read_until(self, cmd, *args, expected='\n\r') -> list:
log.debug("Sending: {}{}{}".format(cmd, ' ' if args else '', ' '.join(args)))
self.comm.device_serial.reset_input_buffer()
self.comm.send("{}{}{}".format(cmd, ' ' if args else '', ' '.join(args)))
read = self.comm.read_until()
log.debug("Receiving: {}".format(read))
read = self.comm.read_until(expected)
log.debug("send_read_until>Receiving: {}".format(read))
return read

def send_read_raw(self, cmd, *args, size=10000):
Expand Down Expand Up @@ -256,4 +261,4 @@ def _xor8_checksum(self, data):
for char in data:
checksum ^= ord(char)
checksum = checksum & 0xFF
return '{:x}'.format(checksum).upper()
return '{:x}'.format(checksum).zfill(2).upper()
2 changes: 1 addition & 1 deletion tools/unicore_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def arg_parse():
log.debug(f"Arguments: {args}")
command = args.command[0]
retries = 0
retry_delay = 10
retry_delay = 2
while retries <= args.retry:
try:
with UnicoGnss(args.port, baudrate=args.baudrate, timeout=2, debug=False) as gnss:
Expand Down

0 comments on commit 823d0dc

Please sign in to comment.