diff --git a/examples/Kalman.ipynb b/examples/Kalman.ipynb index 663d9dc..4a479d8 100644 --- a/examples/Kalman.ipynb +++ b/examples/Kalman.ipynb @@ -137,7 +137,8 @@ } ], "source": [ - "from laika.raw_gnss import process_measurements, correct_measurements, calc_pos_fix\n", + "from laika.raw_gnss import process_measurements, correct_measurements\n", + "from laika.opt import calc_pos_fix\n", "from tqdm.auto import tqdm\n", "\n", "# We want to group measurements by measurement epoch\n", @@ -305,4 +306,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} diff --git a/examples/Walkthrough.ipynb b/examples/Walkthrough.ipynb index 7398fa1..9bc2b12 100644 --- a/examples/Walkthrough.ipynb +++ b/examples/Walkthrough.ipynb @@ -180,6 +180,7 @@ "outputs": [], "source": [ "import laika.raw_gnss as raw\n", + "import laika.opt as opt\n", "import laika.helpers as helpers\n", "\n", "# this example data is the from the example segment\n", @@ -351,12 +352,12 @@ "corrected_measurements_by_epoch = []\n", "for meas_epoch in measurements_by_epoch[::10]:\n", " processed = raw.process_measurements(meas_epoch, dog)\n", - " est_pos = raw.calc_pos_fix(processed)[0][:3]\n", + " est_pos = opt.calc_pos_fix(processed)[0][:3]\n", " corrected = raw.correct_measurements(meas_epoch, est_pos, dog)\n", " corrected_measurements_by_epoch.append(corrected)\n", - " pos_solutions.append(raw.calc_pos_fix(corrected))\n", + " pos_solutions.append(opt.calc_pos_fix(corrected))\n", " # you need an estimate position to calculate a velocity fix\n", - " vel_solutions.append(raw.calc_vel_fix(corrected, pos_solutions[-1][0]))" + " vel_solutions.append(opt.calc_vel_fix(corrected, pos_solutions[-1][0]))" ] }, { diff --git a/laika/dgps.py b/laika/dgps.py index 54c13d8..53111d9 100644 --- a/laika/dgps.py +++ b/laika/dgps.py @@ -5,6 +5,7 @@ from .gps_time import GPSTime from .constants import SECS_IN_YEAR from . import raw_gnss as raw +from . import opt from .rinex_file import RINEXFile from .downloader import download_cors_coords from .helpers import get_constellation @@ -107,8 +108,9 @@ def parse_dgps(station_id, station_obs_file_path, dog, max_distance=100000, requ station_delays[signal] = {} for i, proc_measurement in enumerate(proc_measurements): times.append(proc_measurement[0].recv_time) - Fx_pos = raw.pr_residual(proc_measurement, signal=signal) - residual = -np.array(Fx_pos(list(station_pos) + [0, 0])) + Fx_pos = opt.pr_residual(proc_measurement, signal=signal) + residual, _ = Fx_pos(list(station_pos) + [0,0]) + residual = -np.array(residual) for j, m in enumerate(proc_measurement): prn = m.prn if prn not in station_delays[signal]: diff --git a/laika/opt.py b/laika/opt.py new file mode 100644 index 0000000..1a76f35 --- /dev/null +++ b/laika/opt.py @@ -0,0 +1,191 @@ +import sympy +import numpy as np +from typing import List + +from .constants import EARTH_ROTATION_RATE, SPEED_OF_LIGHT +from .helpers import ConstellationId +from .raw_gnss import GNSSMeasurement + + +def gauss_newton(fun, b, xtol=1e-8, max_n=25): + for _ in range(max_n): + # Compute function and jacobian on current estimate + r, J = fun(b) + + # Update estimate + delta = np.linalg.pinv(J) @ r + b -= delta + + # Check step size for stopping condition + if np.linalg.norm(delta) < xtol: + break + return b + + +def calc_pos_fix(measurements, posfix_functions=None, x0=None, no_weight=False, signal='C1C', min_measurements=6): + ''' + Calculates gps fix using gauss newton method + To solve the problem a minimal of 4 measurements are required. + If Glonass is included 5 are required to solve for the additional free variable. + returns: + 0 -> list with positions + 1 -> pseudorange errs + ''' + if x0 is None: + x0 = [0, 0, 0, 0, 0] + + if len(measurements) < min_measurements: + return [],[] + + Fx_pos = pr_residual(measurements, posfix_functions, signal=signal, no_weight=no_weight, no_nans=True) + x = gauss_newton(Fx_pos, x0) + residual, _ = Fx_pos(x, no_weight=True) + return x.tolist(), residual.tolist() + + +def calc_vel_fix(measurements, est_pos, velfix_function=None, v0=None, no_weight=False, signal='D1C', min_measurements=6): + ''' + Calculates gps velocity fix using gauss newton method + returns: + 0 -> list with velocities + 1 -> pseudorange_rate errs + ''' + if v0 is None: + v0 = [0, 0, 0, 0] + + if len(measurements) < min_measurements: + return [], [] + + Fx_vel = prr_residual(measurements, est_pos, velfix_function, signal=signal, no_weight=no_weight, no_nans=True) + v = gauss_newton(Fx_vel, v0) + residual, _ = Fx_vel(v, no_weight=True) + return v.tolist(), residual.tolist() + + +def get_posfix_sympy_fun(constellation): + # Unknowns + x, y, z = sympy.Symbol('x'), sympy.Symbol('y'), sympy.Symbol('z') + bc = sympy.Symbol('bc') + bg = sympy.Symbol('bg') + zero_theta = sympy.Symbol('zero_theta') + var = [x, y, z, bc, bg] + + # Knowns + pr = sympy.Symbol('pr') + sat_x, sat_y, sat_z = sympy.Symbol('sat_x'), sympy.Symbol('sat_y'), sympy.Symbol('sat_z') + weight = sympy.Symbol('weight') + + theta = (EARTH_ROTATION_RATE * (pr - bc) / SPEED_OF_LIGHT)*zero_theta + val = sympy.sqrt( + (sat_x * sympy.cos(theta) + sat_y * sympy.sin(theta) - x) ** 2 + + (sat_y * sympy.cos(theta) - sat_x * sympy.sin(theta) - y) ** 2 + + (sat_z - z) ** 2 + ) + + if constellation == ConstellationId.GLONASS: + res = weight * (val - (pr - bc - bg)) + elif constellation == ConstellationId.GPS: + res = weight * (val - (pr - bc)) + else: + raise NotImplementedError(f"Constellation {constellation} not supported") + + res = [res] + [sympy.diff(res, v) for v in var] + + return sympy.lambdify([x, y, z, bc, bg, pr, zero_theta, sat_x, sat_y, sat_z, weight], res, modules=["numpy"]) + + +def get_velfix_sympy_func(): + # implementing this without sympy.Matrix gives a 2x speedup at generation + + # knowns, receiver position, satellite position, satellite velocity + ep_x, ep_y, ep_z = sympy.Symbol('ep_x'), sympy.Symbol('ep_y'), sympy.Symbol('ep_z') + est_pos = np.array([ep_x, ep_y, ep_z]) + sp_x, sp_y, sp_z = sympy.Symbol('sp_x'), sympy.Symbol('sp_y'), sympy.Symbol('sp_z') + sat_pos = np.array([sp_x, sp_y, sp_z]) + sv_x, sv_y, sv_z = sympy.Symbol('sv_x'), sympy.Symbol('sv_y'), sympy.Symbol('sv_z') + sat_vel = np.array([sv_x, sv_y, sv_z]) + observables = sympy.Symbol('observables') + weight = sympy.Symbol('weight') + + # unknown, receiver velocity + v_x, v_y, v_z = sympy.Symbol('v_x'), sympy.Symbol('v_y'), sympy.Symbol('v_z') + vel = np.array([v_x, v_y, v_z]) + vel_o = sympy.Symbol('vel_o') + + loss = sat_pos - est_pos + loss /= sympy.sqrt(loss.dot(loss)) + + nv = loss.dot(sat_vel - vel) + ov = (observables - vel_o) + res = (nv - ov)*weight + + res = [res] + [sympy.diff(res, v) for v in [v_x, v_y, v_z, vel_o]] + + return sympy.lambdify([ + ep_x, ep_y, ep_z, sp_x, sp_y, sp_z, + sv_x, sv_y, sv_z, observables, weight, + v_x, v_y, v_z, vel_o + ], + res, modules=["numpy"]) + + +def pr_residual(measurements: List[GNSSMeasurement], posfix_functions=None, signal='C1C', no_weight=False, no_nans=False): + + if posfix_functions is None: + posfix_functions = {constellation: get_posfix_sympy_fun(constellation) for constellation in (ConstellationId.GPS, ConstellationId.GLONASS)} + + def Fx_pos(inp, no_weight=no_weight): + vals, gradients = [], [] + + for meas in measurements: + if signal in meas.observables_final and np.isfinite(meas.observables_final[signal]): + pr = meas.observables_final[signal] + sat_pos = meas.sat_pos_final + zero_theta = 0 + elif signal in meas.observables and np.isfinite(meas.observables[signal]) and meas.processed: + pr = meas.observables[signal] + pr += meas.sat_clock_err * SPEED_OF_LIGHT + sat_pos = meas.sat_pos + zero_theta = 1 + else: + if not no_nans: + vals.append(np.nan) + gradients.append(np.nan) + continue + + w = 1.0 if no_weight or meas.observables_std[signal] == 0 else (1 / meas.observables_std[signal]) + val, *gradient = posfix_functions[meas.constellation_id](*inp, pr, zero_theta, *sat_pos, w) + vals.append(val) + gradients.append(gradient) + return np.asarray(vals), np.asarray(gradients) + return Fx_pos + + +def prr_residual(measurements: List[GNSSMeasurement], est_pos, velfix_function=None, signal='D1C', no_weight=False, no_nans=False): + + if velfix_function is None: + velfix_function = get_velfix_sympy_func() + + def Fx_vel(vel, no_weight=no_weight): + vals, gradients = [], [] + + for meas in measurements: + if signal not in meas.observables or not np.isfinite(meas.observables[signal]): + if not no_nans: + vals.append(np.nan) + gradients.append(np.nan) + continue + + sat_pos = meas.sat_pos_final if meas.corrected else meas.sat_pos + weight = 1.0 if no_weight or meas.observables_std[signal] == 0 else (1 / meas.observables_std[signal]) + + val, *gradient = velfix_function(est_pos[0], est_pos[1], est_pos[2], + sat_pos[0], sat_pos[1], sat_pos[2], + meas.sat_vel[0], meas.sat_vel[1], meas.sat_vel[2], + meas.observables[signal], weight, + vel[0], vel[1], vel[2], vel[3]) + vals.append(val) + gradients.append(gradient) + + return np.asarray(vals), np.asarray(gradients) + return Fx_vel diff --git a/laika/raw_gnss.py b/laika/raw_gnss.py index 39eabda..9f539c4 100644 --- a/laika/raw_gnss.py +++ b/laika/raw_gnss.py @@ -292,108 +292,6 @@ def read_rinex_obs(obsdata) -> List[List[GNSSMeasurement]]: return measurements -def calc_pos_fix(measurements, x0=[0, 0, 0, 0, 0], no_weight=False, signal='C1C', min_measurements=6): - ''' - Calculates gps fix with WLS optimizer - - returns: - 0 -> list with positions - 1 -> pseudorange errs - ''' - import scipy.optimize as opt # Only use scipy here - - n = len(measurements) - if n < min_measurements: - return [] - - Fx_pos = pr_residual(measurements, signal=signal, no_weight=no_weight, no_nans=True) - opt_pos = opt.least_squares(Fx_pos, x0).x - return opt_pos, Fx_pos(opt_pos, no_weight=True) - - -def calc_vel_fix(measurements, est_pos, v0=[0, 0, 0, 0], no_weight=False, signal='D1C'): - ''' - Calculates gps velocity fix with WLS optimizer - - returns: - 0 -> list with velocities - 1 -> pseudorange_rate errs - ''' - import scipy.optimize as opt # Only use scipy here - - n = len(measurements) - if n < 6: - return [] - - Fx_vel = prr_residual(measurements, est_pos, signal=signal, no_weight=no_weight, no_nans=True) - opt_vel = opt.least_squares(Fx_vel, v0).x - return opt_vel, Fx_vel(opt_vel, no_weight=True) - - -def pr_residual(measurements: List[GNSSMeasurement], signal='C1C', no_weight=False, no_nans=False): - # solve for pos - def Fx_pos(xxx_todo_changeme, no_weight=no_weight): - (x, y, z, bc, bg) = xxx_todo_changeme - rows = [] - - for meas in measurements: - if signal in meas.observables_final and np.isfinite(meas.observables_final[signal]): - pr = meas.observables_final[signal] - sat_pos = meas.sat_pos_final - theta = 0 - elif signal in meas.observables and np.isfinite(meas.observables[signal]) and meas.processed: - pr = meas.observables[signal] - pr += meas.sat_clock_err * constants.SPEED_OF_LIGHT - sat_pos = meas.sat_pos - theta = constants.EARTH_ROTATION_RATE * (pr - bc) / constants.SPEED_OF_LIGHT - else: - if not no_nans: - rows.append(np.nan) - continue - if no_weight: - weight = 1 - else: - weight = (1 / meas.observables_std[signal]) - - val = np.sqrt( - (sat_pos[0] * np.cos(theta) + sat_pos[1] * np.sin(theta) - x) ** 2 + - (sat_pos[1] * np.cos(theta) - sat_pos[0] * np.sin(theta) - y) ** 2 + - (sat_pos[2] - z) ** 2 - ) - if meas.constellation_id == ConstellationId.GLONASS: - rows.append(weight * (val - (pr - bc - bg))) - elif meas.constellation_id == ConstellationId.GPS: - rows.append(weight * (val - (pr - bc))) - return rows - return Fx_pos - - -def prr_residual(measurements, est_pos, signal='D1C', no_weight=False, no_nans=False): - # solve for vel - def Fx_vel(vel, no_weight=no_weight): - rows = [] - for meas in measurements: - if signal not in meas.observables or not np.isfinite(meas.observables[signal]): - if not no_nans: - rows.append(np.nan) - continue - if meas.corrected: - sat_pos = meas.sat_pos_final - else: - sat_pos = meas.sat_pos - if no_weight: - weight = 1 - else: - weight = (1 / meas.observables[signal]) - los_vector = (sat_pos - est_pos[0:3] - ) / np.linalg.norm(sat_pos - est_pos[0:3]) - rows.append( - weight * ((meas.sat_vel - vel[0:3]).dot(los_vector) - - (meas.observables[signal] - vel[3]))) - return rows - return Fx_vel - - def get_Q(recv_pos, sat_positions): local = LocalCoord.from_ecef(recv_pos) sat_positions_rel = local.ecef2ned(sat_positions) diff --git a/tests/test_positioning.py b/tests/test_positioning.py index 05747f0..5f51c57 100644 --- a/tests/test_positioning.py +++ b/tests/test_positioning.py @@ -10,6 +10,7 @@ from laika.rinex_file import RINEXFile from laika.dgps import get_station_position import laika.raw_gnss as raw +import laika.opt as opt class TestPositioning(unittest.TestCase): @@ -50,7 +51,7 @@ def run_station_position(self, length): # fixes for every epoch (every 30s) over 24h. ests = [] for corr in tqdm(rinex_corr_grouped): - ret = raw.calc_pos_fix(corr) + ret = opt.calc_pos_fix(corr) if len(ret) > 0: fix, _ = ret ests.append(fix)