Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nav subcommand #33

Closed
wants to merge 11 commits into from
2 changes: 2 additions & 0 deletions hxtool/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from . import gpslog
from . import id
from . import info
from . import nav
from . import nmea

__all__ = [
Expand All @@ -17,5 +18,6 @@
"gpslog",
"id",
"info",
"nav",
"nmea"
]
192 changes: 192 additions & 0 deletions hxtool/cli/nav.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# -*- coding: utf-8 -*-

import gpxpy
import gpxpy.gpx
from datetime import datetime
from logging import getLogger
from os.path import abspath
from pkg_resources import require

import hxtool
from .base import CliCommand

logger = getLogger(__name__)


class NavCommand(CliCommand):

name = "nav"
help = "dump or flash navigation data (waypoints and routes)"

@staticmethod
def setup_args(parser) -> None:
parser.add_argument("-g", "--gpx",
help="name of GPX file",
type=abspath,
action="store")
parser.add_argument("-d", "--dump",
help="read nav data from device and write to file",
action="store_true")
parser.add_argument("-f", "--flash",
help="read nav data from file and write to device",
action="store_true")
parser.add_argument("-e", "--erase",
help="erase existing nav data from device",
action="store_true")

def run(self):

hx = hxtool.get(self.args)
if hx is None:
return 10

if not hx.comm.cp_mode:
logger.critical("For navigation data functions, device must be in CP mode (MENU + ON)")
return 10

result = 0

if self.args.dump:
result = max(self.dump(hx), result)

if self.args.flash or self.args.erase:
result = max(self.flash_erase(hx), result)

return result


def dump(self, hx):
if self.args.gpx:
logger.info("Reading nav data from handset")
raw_nav_data = hx.config.read_nav_data(True)
logger.info("Writing GPX nav data to `{}`".format(self.args.gpx))
gpx_name = "Nav data read from {} with MMSI {}".format(
hx.__class__.__name__, hx.config.read_mmsi()[0] )
return write_gpx(raw_nav_data, self.args.gpx, gpx_name)
return 0


def flash_erase(self, hx):
nav_data = { "waypoints": [], "routes": [] }

if self.args.flash:
if self.args.gpx:
logger.info("Reading GPX nav data from `{}`".format(self.args.gpx))
nav_data = read_gpx(self.args.gpx)

logger.info(log_nav_data("Read {w} waypoint{ws} and {r} route{rs} from file", nav_data))
if self.args.erase:
logger.info("Will replace nav data on device")
else:
logger.info("Will append to nav data on device")

logger.info("Reading nav data from handset")
raise NotImplementedError

logger.info(log_nav_data("In total {w} waypoint{ws} and {r} route{rs}", nav_data))
if nav_data_oversized(nav_data, hx):
return 10

logger.info("Writing nav data to handset")
hx.config.write_nav_data(nav_data, True)

return 0


def log_nav_data(text: str, nav_data: dict) -> str:
waypoint_count = len(nav_data["waypoints"])
route_count = len(nav_data["routes"])
return text.format(
w = waypoint_count, ws = "s" if waypoint_count != 1 else "",
r = route_count, rs = "s" if route_count != 1 else "" )


def nav_data_oversized(nav_data: dict, hx: object) -> bool:
limits = hx.config.limits()
oversized = False
if len(nav_data["waypoints"]) > limits["waypoints"]:
logger.critical("Too many waypoints to fit on device (maximum: {})".format(limits["waypoints"]))
oversized = True
if len(nav_data["routes"]) > limits["routes"]:
logger.critical("Too many routes to fit on device (maximum: {})".format(limits["routes"]))
oversized = True
return oversized


def read_gpx(file_name: str) -> dict:
gpx = gpxpy.parse(open(file_name, 'r'))
nav_data = { "waypoints": [], "routes": [] }
index = 0
# Known issue: Route/waypoint relationships read from device are not
# stored in GPX, resulting in a profliferation of duplicate waypoints
# when routes that had been dumped from the device are flashed back.
# See GH #30 for a brief discussion of possible solutions.

for p in gpx.waypoints:
index += 1
point = {
"latitude": p.latitude,
"longitude": p.longitude,
"name": p.name,
"id": index,
}
nav_data["waypoints"].append(point)

for r in gpx.routes:
route = { "name": r.name, "points": [] }
for p in r.points:
index += 1
point = {
"latitude": p.latitude,
"longitude": p.longitude,
"name": p.name,
"id": index,
}
route["points"].append(point)
nav_data["waypoints"].append(point)
nav_data["routes"].append(route)

return nav_data


def write_gpx(nav_data: dict, file_name: str, gpx_name: str) -> int:
if len(nav_data["waypoints"]) == 0:
logger.warning("No waypoints in device. Not writing empty GPX file")
return 0

gpx = gpxpy.gpx.GPX()
gpx.name = gpx_name
gpx.creator = "hxtool {} - github.com/cr/hx870".format(require("hxtool")[0].version)
gpx.time = datetime.now()

for point in nav_data["waypoints"]:
comment = "id {}".format(point["id"])
if point["mmsi"] is not None:
comment += "; position received from MMSI {}".format(point["mmsi"])
p = gpxpy.gpx.GPXWaypoint(
latitude=point["latitude_decimal"],
longitude=point["longitude_decimal"],
name=point["name"],
comment=comment,
)
gpx.waypoints.append(p)

for route in nav_data["routes"]:
r = gpxpy.gpx.GPXRoute(name=route["name"])
for point in route["points"]:
comment = "id {}".format(point["id"])
if point["mmsi"] is not None:
comment += "; position received from MMSI {}".format(point["mmsi"])
p = gpxpy.gpx.GPXRoutePoint(
latitude=point["latitude_decimal"],
longitude=point["longitude_decimal"],
name=point["name"],
comment=comment,
)
r.points.append(p)
gpx.routes.append(r)

with open(file_name, "w") as f:
f.write(gpx.to_xml(version="1.1"))

return 0
83 changes: 73 additions & 10 deletions hxtool/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from binascii import hexlify, unhexlify
from logging import getLogger

from .memory import unpack_waypoint
from .memory import unpack_waypoint, pack_waypoint, unpack_route, pack_route
from .protocol import GenericHXProtocol, ProtocolError

logger = getLogger(__name__)
Expand All @@ -14,6 +14,12 @@ class GenericHXConfig(object):
def __init__(self, protocol: GenericHXProtocol):
self.p = protocol

def limits(self):
return {
"waypoints": 200,
"routes": 20,
}

def config_read(self, progress=False):
config_data = b''
bytes_to_go = 0x8000
Expand Down Expand Up @@ -59,16 +65,73 @@ def config_write(self, data, check_region=True, progress=False):
logger.info(f"{bytes_to_go} / {bytes_to_go} bytes (100%)")

def read_waypoints(self):
wp_data = b''
for address in range(0x4300, 0x5c00, 0x40):
wp_data += self.p.read_config_memory(address, 0x40)
wp_list = []
for wp_id in range(1, 201):
offset = (wp_id - 1) * 32
wp = unpack_waypoint(wp_data[offset:offset+32])
return self.read_nav_data() ["waypoints"]

def read_nav_data(self, progress=False):
nav_data = b''
bytes_to_go = 0x5e80 - 0x4300
for offset in range(0x4300, 0x5e80, 0x40):
bytes_done = offset - 0x4300
nav_data += self.p.read_config_memory(offset, 0x40)
if bytes_done % 0xdc0 == 0: # 50%
percent_done = int(100.0 * bytes_done / bytes_to_go)
logger.info(f"{bytes_done} / {bytes_to_go} bytes ({percent_done}%)")
waypoints = []
wp_index = {}
for offset in range(0, 200 * 0x20, 0x20):
wp = unpack_waypoint(nav_data[offset:offset+0x20])
if wp is not None:
wp_list.append(wp)
return wp_list
wp_index[wp["id"]] = len(waypoints)
waypoints.append(wp)
routes = []
for offset in range(200 * 0x20, 220 * 0x20, 0x20):
rt = unpack_route(nav_data[offset:offset+0x20])
if rt is not None:
for i in range(0, len(rt["points"])):
# unpack_route() just returns waypoint IDs; replace those with the actual waypoints
rt["points"][i] = waypoints[wp_index[ rt["points"][i] ]]
routes.append(rt)
status = self.p.read_config_memory(0x0005, 1)
waypoint_history = self.p.read_config_memory(0x05e0, 6)
route_history = self.p.read_config_memory(0x05f0, 6)
if progress:
logger.info(f"{bytes_to_go} / {bytes_to_go} bytes (100%)")
return {
"waypoints": waypoints,
"routes": routes,
"status": list(status)[0],
"waypoint_history": list(filter(lambda x: x != 0xff, waypoint_history)),
"route_history": list(filter(lambda x: x != 0xff, route_history)),
}

def write_nav_data(self, nav_data, progress=False):
config = b''
if len(nav_data["waypoints"]) > 200:
raise ProtocolError("Too many waypoints")
for waypoint in nav_data["waypoints"]:
config += pack_waypoint(waypoint)
while len(config) < 200 * 0x20:
config += b'\xff'*0x20
for route in nav_data["routes"]:
config += pack_route(route)
while len(config) < 200 * 0x20 + 20 * 0x20:
config += b'\xff'*0x20

config_size = len(config) # 0x5e80 - 0x4300
for offset in range(0, config_size, 0x40):
self.p.write_config_memory(0x4300 + offset, config[offset:offset+0x40])
if progress and offset % 0xdc0 == 0: # 50%
percent_done = int(100.0 * offset / config_size)
logger.info(f"{offset} / {config_size} bytes ({percent_done}%)")

if "status" in nav_data or "waypoint_history" in nav_data or "route_history" in nav_data:
raise NotImplementedError
self.p.write_config_memory(0x0005, b'\x00')
self.p.write_config_memory(0x05e0, b'\xff'*6)
self.p.write_config_memory(0x05f0, b'\xff'*6)

if progress:
logger.info(f"{config_size} / {config_size} bytes (100%)")

def read_mmsi(self):
data = hexlify(self.p.read_config_memory(0x00b0, 6)).decode().upper()
Expand Down
Loading