forked from SpectacularAI/u-blox-capture
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gps_converter.py
166 lines (136 loc) · 5.29 KB
/
gps_converter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import argparse
import json
from pandas import Timestamp, Timedelta # Use pandas time objects for nanosecond precision
import os
parser = argparse.ArgumentParser(description="Convert TIMEUTC, PVT and HPPOSLLH into accurate GPS coordinates")
parser.add_argument("file", help="ubx JSONL file")
parser.add_argument("-low", help="Force to use low precision location", action="store_true")
UBLOX_LATLON_SCALE = 1e-7
UBLOX_LATLON_HP_SCALE = 1e-2
UBLOX_ACC_SCALE = 1e-3
MM_TO_METERS = 1e-3
def extractTimestamp(raw):
if raw.get("valid", {}).get("validTime") or raw.get("valid", {}).get("validUTC"):
# nano can be negative, so add it via Timedelta
ts = Timestamp(
year = raw["year"],
month = raw["month"],
day = raw["day"],
hour = raw["hour"],
minute = raw["min"],
second = raw["sec"]
) + Timedelta(value = raw["nano"], unit = "nanoseconds")
else:
ts = None
return ts
def extractHighPrecisionLocation(raw):
# Precise latitude in deg * 1e-7 = lat + (latHp * 1e-2)
lat = (raw["lat"] + raw["latHp"] * UBLOX_LATLON_HP_SCALE) * UBLOX_LATLON_SCALE
lon = (raw["lon"] + raw["lonHp"] * UBLOX_LATLON_HP_SCALE) * UBLOX_LATLON_SCALE
# Precise height in mm = hMSL + (hMSLHp * 0.1)
alt = (raw["hMSL"] + raw["hMSLHp"] * 0.1) * MM_TO_METERS
acc = raw["hAcc"] * MM_TO_METERS
accV = raw["vAcc"] * MM_TO_METERS
return (lat, lon, alt, acc, accV)
def extractLocation(raw):
lat = raw["lat"] * UBLOX_LATLON_SCALE
lon = raw["lon"] * UBLOX_LATLON_SCALE
alt = raw["hMSL"] * MM_TO_METERS
acc = raw["hAcc"] * MM_TO_METERS
accV = raw["vAcc"] * MM_TO_METERS
return (lat, lon, alt, acc, accV)
def earliestMonoTime(current, event):
if event and event.get("monoTime"):
newTime = event.get("monoTime")
if not current or current > newTime: return newTime
return current
def buildMeasurement(group, useHighPrecision=True, itow=None):
ts = None
if group.get("PVT"):
ts = extractTimestamp(group.get("PVT"))
elif group.get("TIMEUTC"):
ts = extractTimestamp(group.get("TIMEUTC"))
if not ts:
if itow: print("Valid timestamp missing, skipping iTOW={}".format(itow))
return None
acc = None
if group.get("HPPOSLLH"):
if useHighPrecision:
lat, lon, alt, acc, accV = extractHighPrecisionLocation(group.get("HPPOSLLH"))
else:
lat, lon, alt, acc, accV = extractLocation(group.get("HPPOSLLH"))
elif not useHighPrecision and group.get("PVT"):
lat, lon, alt, acc, accV = extractLocation(group.get("PVT"))
if not acc:
if itow: print("Valid location missing, skipping iTOW={}".format(itow))
return None
monoTime = None
monoTime = earliestMonoTime(monoTime, group.get("PVT"))
monoTime = earliestMonoTime(monoTime, group.get("TIMEUTC"))
monoTime = earliestMonoTime(monoTime, group.get("HPPOSLLH"))
if (not monoTime):
print("Couldnt find monoTime")
return None
measurement = {
"time": monoTime,
"gpsUtcTime": ts.timestamp(),
"lat": lat,
"lon": lon,
"altitude": alt,
"accuracy": acc,
"verticalAccuracy": accV
}
pvt = group.get("PVT")
if pvt:
measurement["velocity"] = {
"north": pvt["velN"] * MM_TO_METERS,
"east": pvt["velE"] * MM_TO_METERS,
"down": pvt["velD"] * MM_TO_METERS,
}
measurement["groundSpeed"] = pvt["gSpeed"] * MM_TO_METERS
measurement["speedAccuracy"] = pvt["sAcc"] * MM_TO_METERS
measurement["fixStatus"] = pvt["flags"]
return measurement
def run(args):
inputFile = os.path.splitext(args.file)
outputFile = inputFile[0] + "-gps" + inputFile[1]
print("Starting processing")
# Group data based on iTOW, they belong to same navigation solution
useHighPrecision = False
itowGroups = {}
with open(args.file) as f:
lines = f.readlines()
for line in lines:
msg = json.loads(line)
msgType = msg["type"]
if msgType == "HPPOSLLH": useHighPrecision = True
if msgType == "PVT" or msgType == "HPPOSLLH" or msgType == "TIMEUTC":
payload = msg["payload"]
itow = payload["iTOW"]
group = itowGroups.get(itow)
if not group:
group = {"iTOW": itow}
itowGroups[itow] = group
group[msgType] = payload
group[msgType]["monoTime"] = msg["monoTime"]
if args.low:
useHighPrecision = False
if useHighPrecision:
print("Found HPPOSLLH events, only using them for high precision. PVT events excluded. Use -low flag to disable.")
else:
print("Using low precision mode")
# Convert groups into GPS coordinates
coordinates = []
for itow in itowGroups:
group = itowGroups[itow]
measurement = buildMeasurement(group, useHighPrecision, itow)
if measurement:
coordinates.append(measurement)
coordinates.sort(key=lambda x: x["time"])
with open(outputFile, "w") as writer:
for coord in coordinates:
writer.write(json.dumps(coord) + "\n")
if __name__ == "__main__":
args = parser.parse_args()
run(args)
print("Done!")