diff --git a/profiling_example/README.md b/profiling_example/README.md new file mode 100644 index 0000000..60c74a4 --- /dev/null +++ b/profiling_example/README.md @@ -0,0 +1,55 @@ +# Profiling example for Network unwrapping + +This is an example of how to perform profiling on the Arc Unwrapping algorithms. In this example we used the so-called "LAMBDA method" developed by TUDelft, and use a wrapper function in DePSI to call the unwrapping algorithm. + +- LAMBDA method: [LAMBDA.py](https://github.com/TUDelftGeodesy/DePSI_group/blob/dev/depsi/LAMBDA.py) (private repo) +- Wrapper funtion `lambda_estimation`: [lambda_estimation](https://github.com/TUDelftGeodesy/DePSI_group/blob/dev/depsi/LAMBDA.py) (private repo) +- Data needed for the example: [stm_amsterdam_173p.zarr](https://zenodo.org/records/15324181/files/stm_amsterdam_173p.zarr.zip?download=1) + + +## File structure + +- lambda_unwrap.ipynb: Jupyter notebook for debugging and visualization +- lambda_unwrap.py: Python script for running the unwrapping algorithm in a recursive loop +- lambda_unwrap_dask.ipynb: Jupyter notebook for debugging dask method +- lambda_unwrap_dask.py: Python script for running the same unwrapping algorithm with dask databags + +## Profiling the unwrapping algorithm + +We use the `py-spy` package to profile the unwrapping algorithm. For example, we can use the following command to profile the unwrapping algorithm without dask + +### Loop method + +```sh +py-spy record --output profile_loop_60pnts --idle --rate 5 --subprocesses --format speedscope python lambda_unwrap.py +``` + +### Dask with `processes` schedular: + +In `lambda_unwrap_dask.py` configure: + +```py +# Configure dask scheduler +dask.config.set(scheduler="processes") +``` + +```sh +py-spy record --output profile_dask_60pnts_processes --idle --rate 5 --subprocesses --format speedscope python lambda_unwrap_dask.py +``` + +### Dask with `threads` schedular: + +```py +# Configure dask scheduler +dask.config.set(scheduler="threads") +``` + +```sh +py-spy record --output profile_dask_60pnts_threads --idle --rate 5 --subprocesses --format speedscope python lambda_unwrap_dask.py +``` + +Then you can visualize the profile using the [`speedscope` web tool](https://www.speedscope.app/) + +## Profiling results + +The results for 5 points, 24 points and 60 points can be found at: [this link](https://zenodo.org/records/15393928/files/experiments.zip) \ No newline at end of file diff --git a/profiling_example/lambda.ipynb b/profiling_example/lambda.ipynb new file mode 100755 index 0000000..b207c87 --- /dev/null +++ b/profiling_example/lambda.ipynb @@ -0,0 +1,317 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# An example to generate STM from dynamic estimation\n", + "\n", + "adapted from https://github.com/TUDelftGeodesy/DePSI_group/blob/dev/examples/notebooks/demo_dynamic_estimation.ipynb" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import xarray as xr\n", + "import matplotlib.pyplot as plt\n", + "\n", + "# from depsi.LAMBDA import *\n", + "from depsi.dynamic_estimation import lambda_estimation" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Set input parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# set wavelength of Sentinel-1 (C-band)\n", + "wavelength = 0.055465763 # m\n", + "init_len = 50\n", + "sigma_acc = 0.003 # m/yr^2\n", + "L = 30/365\n", + "\n", + "# Initial gauss for the sigma of the unknown parameters\n", + "sigma_offset = 0.001 #m\n", + "sigma_vel = 0.0001 #m/yr\n", + "sigma_h = 5 #m\n", + "sigma_ther = 0.00005 #m/°C\n", + "\n", + "# the option for the LAMBDA method\n", + "# method == 1: ILS with shrinking search\n", + "# method == 2: Integer rounding\n", + "# method == 3: Integer bootstrapping\n", + "# method == 4: PAR\n", + "# method == 5: ILS with Ratio Test\n", + "method = 3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load the STM derived from PS selection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load all to memory\n", + "stm = xr.open_zarr('../../data/stm_amsterdam_173p.zarr').compute()\n", + "stm" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Choose the reference point" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Select the point with the smallest NMAD for initialization as the reference point\n", + "nmad_init = np.array(stm['nmad_init'])\n", + "ref_pnt_idx= int(np.argmin(nmad_init))\n", + "ref_pnt_idx" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Define functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def polynomial_fitting(x, y, degree=1):\n", + " \"\"\"\n", + " Perform polynomial fitting on the input data (x, y).\n", + "\n", + " Parameters:\n", + " \n", + " x : array_like\n", + " Independent variable data points.\n", + " y : array_like \n", + " Dependent variable data points.\n", + " degree: int\n", + " Degree of the polynomial fit (default is 1 for linear).\n", + "\n", + " Returns:\n", + " y_fitted : ndarray\n", + " Fitted y values for the given x based on the polynomial fit.\n", + " poly_fn : np.poly1d\n", + " Polynomial function that represents the fit.\n", + " coeffs : ndarray \n", + " Coefficients of the fitted polynomial.\n", + " \"\"\"\n", + " # Validate inputs\n", + " if len(x) != len(y):\n", + " raise ValueError(\"Input arrays 'x' and 'y' must have the same length.\")\n", + " if degree < 1:\n", + " raise ValueError(\"Degree must be at least 1.\")\n", + "\n", + " # Perform polynomial fitting\n", + " coeffs = np.polyfit(x, y, degree)\n", + " poly_fn = np.poly1d(coeffs)\n", + " y_fitted = poly_fn(x)\n", + "\n", + " return y_fitted, poly_fn, coeffs\n", + "\n", + "\n", + "\n", + "def NMAD_to_sigma_phase(nmad, method):\n", + "\n", + " \"\"\"\n", + " Converts Normalized Median Absolute Deviation (NMAD) to the sigma of phase observations.\n", + "\n", + " This function uses an empirical cubic approximation (and is not based on physics):\n", + " sigma = a + b*nmad + c*nmad**2 + d*nmad**3\n", + "\n", + " Parameters:\n", + " nmad : array_like\n", + " Normalized Median Absolute Deviation value.\n", + " method : {'mean', 'mean_2_sigma'}\n", + " Method used to compute sigma.\n", + " \n", + " Returns:\n", + " sigma: ndarray\n", + " Estimated sigma value.\n", + " \"\"\"\n", + " if method == 'mean':\n", + " a, b, c, d = -0.0144869469, 2.00028682, -5.23271341, 21.1111801\n", + " elif method == 'mean_2_sigma':\n", + " a, b, c, d = 0.01907808, 1.2852969, 1.90052824, 11.60677721\n", + " else:\n", + " raise ValueError(\"Invalid method. Choose 'mean' or 'mean_2_sigma'.\")\n", + "\n", + " sigma = a + b*nmad + c*nmad**2 + d*nmad**3\n", + " \n", + " return sigma" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Perform unwrapping with lambda estimation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# select the reference point\n", + "stm_refpnt = stm.isel(space=ref_pnt_idx)\n", + "\n", + "# subset the data for debug\n", + "NUM_POINTS = 5\n", + "stm = stm.isel(space=range(5))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initiate empty arrays to store results\n", + "x_hat = np.zeros((stm.sizes[\"space\"], 4))\n", + "Q_xhat = np.zeros(( 4, 4, stm.sizes[\"space\"]))\n", + "y_hats = np.zeros((stm.sizes[\"space\"], stm.sizes[\"time\"]))\n", + "phs_unw_init = np.zeros((stm.sizes[\"space\"], stm.sizes[\"time\"]))\n", + "\n", + "\n", + "# Loop over all points and perform phase unwrapping\n", + "for pnt_id in range(stm.sizes[\"space\"]):\n", + " # Select current point\n", + " stm_1pnt = stm.isel(space=pnt_id)\n", + "\n", + " print(f\"{pnt_id}/{stm.sizes['space']}\")\n", + "\n", + " # Get the sigma for the arc with the NMAD from the incremental time series\n", + " sigma_nmad_inc_i = NMAD_to_sigma_phase(\n", + " stm_1pnt[\"nmad_inc_stm\"].data, \"mean_2_sigma\"\n", + " )\n", + " sigma_nmad_inc_j = NMAD_to_sigma_phase(\n", + " stm_refpnt[\"nmad_inc_stm\"].data, \"mean_2_sigma\"\n", + " )\n", + " sigma_nmad_inc_arc = np.sqrt(\n", + " np.square(sigma_nmad_inc_i) + np.square(sigma_nmad_inc_j)\n", + " )\n", + "\n", + " # Compute arc phase\n", + " phs_wrapped_arc = np.angle(\n", + " stm_refpnt[\"sd_complex\"].data * stm_1pnt[\"sd_complex\"].data.conj()\n", + " )\n", + "\n", + " # Compute 'mean' h2ph value for the arc (which we currently model as the average of the two time series)\n", + " h2ph_arc = (stm_refpnt[\"h2ph_values\"].data + stm_1pnt[\"h2ph_values\"].data) / 2\n", + "\n", + " results = lambda_estimation(\n", + " wavelength=wavelength,\n", + " phs_wrapped=phs_wrapped_arc,\n", + " sigma_phs_apri=sigma_nmad_inc_arc,\n", + " years=stm_1pnt[\"years\"].data,\n", + " h2ph_arc=h2ph_arc,\n", + " temp=stm_1pnt[\"temperature\"].data,\n", + " sigma_offset=sigma_offset,\n", + " sigma_vel=sigma_vel,\n", + " sigma_h=sigma_h,\n", + " sigma_ther=sigma_ther,\n", + " method=method,\n", + " )\n", + "\n", + " # Store results\n", + " x_hat[pnt_id, :] = results[0]\n", + " Q_xhat[:, :, pnt_id] = results[1]\n", + " y_hats[pnt_id, :] = results[2]\n", + " phs_unw_init[pnt_id, :] = results[3]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# For now, write x_hat and Q_xhat to npz files, since they so not fit in dimensions of an STM\n", + "np.savez(\n", + " \"./x_hat_Q_xhat.npz\",\n", + " x_hat=x_hat,\n", + " Q_xhat=Q_xhat,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Attach y_hats and phs_unw_init to the STM\n", + "stm[\"y_hats\"] = ((\"space\", \"time\"), y_hats)\n", + "stm[\"phs_unw_init\"] = ((\"space\", \"time\"), phs_unw_init)\n", + "stm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "stm.to_zarr(\n", + " \"./stm_amsterdam_173p_init_unw.zarr\",\n", + " mode=\"w\",\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "mobyle", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/profiling_example/lambda.py b/profiling_example/lambda.py new file mode 100644 index 0000000..7550005 --- /dev/null +++ b/profiling_example/lambda.py @@ -0,0 +1,132 @@ +import numpy as np +import xarray as xr +from depsi.dynamic_estimation import lambda_estimation + + +# Constants +WAVELENGTH = 0.055465763 # m WAVELENGTH of Sentinel-1 (C-band) + +# Initial gauss for the sigma of the unknown parameters +SIGMA_OFFSET = 0.001 # m +SIGMA_VEL = 0.0001 # m/yr +sigma_h = 5 # m +SIGMA_THER = 0.00005 # m/°C + +# the option for the LAMBDA METHOD +# METHOD == 1: ILS with shrinking search +# METHOD == 2: Integer rounding +# METHOD == 3: Integer bootstrapping +# METHOD == 4: PAR +# METHOD == 5: ILS with Ratio Test +METHOD = 3 + +# Number of points to load for debugging +# Set to None to load all points +NUM_POINTS = 60 + +FILE_PATH = "../../data/stm_amsterdam_173p.zarr" + + +def NMAD_to_sigma_phase(nmad, METHOD): + """ + Converts Normalized Median Absolute Deviation (NMAD) to the sigma of phase observations. + + This function uses an empirical cubic approximation (and is not based on physics): + sigma = a + b*nmad + c*nmad**2 + d*nmad**3 + + Parameters: + nmad : array_like + Normalized Median Absolute Deviation value. + METHOD : {'mean', 'mean_2_sigma'} + METHOD used to compute sigma. + + Returns: + sigma: ndarray + Estimated sigma value. + """ + if METHOD == "mean": + a, b, c, d = -0.0144869469, 2.00028682, -5.23271341, 21.1111801 + elif METHOD == "mean_2_sigma": + a, b, c, d = 0.01907808, 1.2852969, 1.90052824, 11.60677721 + else: + raise ValueError("Invalid METHOD. Choose 'mean' or 'mean_2_sigma'.") + + sigma = a + b * nmad + c * nmad**2 + d * nmad**3 + + return sigma + + +if __name__ == "__main__": + # Load all to memory + stm = xr.open_zarr(FILE_PATH).compute() + + # subset the data for debug + if NUM_POINTS is not None: + # Select a subset of points for debugging + stm = stm.isel(space=range(NUM_POINTS)) + + # select the reference point + nmad_init = np.array(stm["nmad_init"]) + ref_pnt_idx = int(np.argmin(nmad_init)) + stm_refpnt = stm.isel(space=ref_pnt_idx) + + # Initiate empty arrays to store results + x_hat = np.zeros((stm.sizes["space"], 4)) + Q_xhat = np.zeros((4, 4, stm.sizes["space"])) + y_hats = np.zeros((stm.sizes["space"], stm.sizes["time"])) + phs_unw_init = np.zeros((stm.sizes["space"], stm.sizes["time"])) + + # Loop over all points and perform phase unwrapping + for pnt_id in range(stm.sizes["space"]): + # Select current point + stm_1pnt = stm.isel(space=pnt_id) + + print(f"{pnt_id}/{stm.sizes['space']}") + + # Get the sigma for the arc with the NMAD from the incremental time series + sigma_nmad_inc_i = NMAD_to_sigma_phase( + stm_1pnt["nmad_inc_stm"].data, "mean_2_sigma" + ) + sigma_nmad_inc_j = NMAD_to_sigma_phase( + stm_refpnt["nmad_inc_stm"].data, "mean_2_sigma" + ) + sigma_nmad_inc_arc = np.sqrt( + np.square(sigma_nmad_inc_i) + np.square(sigma_nmad_inc_j) + ) + + # Compute arc phase + phs_wrapped_arc = np.angle( + stm_refpnt["sd_complex"].data * stm_1pnt["sd_complex"].data.conj() + ) + + # Compute 'mean' h2ph value for the arc (which we currently model as the average of the two time series) + h2ph_arc = (stm_refpnt["h2ph_values"].data + stm_1pnt["h2ph_values"].data) / 2 + + results = lambda_estimation( + wavelength=WAVELENGTH, + phs_wrapped=phs_wrapped_arc, + sigma_phs_apri=sigma_nmad_inc_arc, + years=stm_1pnt["years"].data, + h2ph_arc=h2ph_arc, + temp=stm_1pnt["temperature"].data, + sigma_offset=SIGMA_OFFSET, + sigma_vel=SIGMA_VEL, + sigma_h=sigma_h, + sigma_ther=SIGMA_THER, + method=METHOD, + ) + + # Store results + x_hat[pnt_id, :] = results[0] + Q_xhat[:, :, pnt_id] = results[1] + y_hats[pnt_id, :] = results[2] + phs_unw_init[pnt_id, :] = results[3] + + # Write to file + np.savez( + "./results.npz", + x_hat=x_hat, + Q_xhat=Q_xhat, + y_hats=y_hats, + phs_unw_init=phs_unw_init, + ) diff --git a/profiling_example/lambda_unwrap_dask.ipynb b/profiling_example/lambda_unwrap_dask.ipynb new file mode 100755 index 0000000..769ec17 --- /dev/null +++ b/profiling_example/lambda_unwrap_dask.ipynb @@ -0,0 +1,375 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# An example to generate STM from dynamic estimation\n", + "\n", + "adapted from https://github.com/TUDelftGeodesy/DePSI_group/blob/dev/examples/notebooks/demo_dynamic_estimation.ipynb" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "import xarray as xr\n", + "import dask\n", + "import dask.bag as db\n", + "from depsi.dynamic_estimation import lambda_estimation\n", + "\n", + "# Configure dask scheduler\n", + "dask.config.set(scheduler=\"threads\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Set input parameters" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# set wavelength of Sentinel-1 (C-band)\n", + "wavelength = 0.055465763 # m\n", + "init_len = 50\n", + "sigma_acc = 0.003 # m/yr^2\n", + "L = 30 / 365\n", + "\n", + "# Initial gauss for the sigma of the unknown parameters\n", + "sigma_offset = 0.001 # m\n", + "sigma_vel = 0.0001 # m/yr\n", + "sigma_h = 5 # m\n", + "sigma_ther = 0.00005 # m/°C\n", + "\n", + "# the option for the LAMBDA method\n", + "# method == 1: ILS with shrinking search\n", + "# method == 2: Integer rounding\n", + "# method == 3: Integer bootstrapping\n", + "# method == 4: PAR\n", + "# method == 5: ILS with Ratio Test\n", + "method = 3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load the STM derived from PS selection" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load all to memory\n", + "# Must load to memory first since dask bags does not take dask arrays\n", + "# Another option is to load each point by a delayed function before mapping\n", + "stm = xr.open_zarr(\"../../data/stm_amsterdam_173p.zarr\").compute()\n", + "stm" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Choose the reference point" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Select the point with the smallest NMAD for initialization as the reference point\n", + "nmad_init = np.array(stm[\"nmad_init\"])\n", + "ref_pnt_idx = int(np.argmin(nmad_init))\n", + "ref_pnt_idx" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Define functions" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def polynomial_fitting(x, y, degree=1):\n", + " \"\"\"\n", + " Perform polynomial fitting on the input data (x, y).\n", + "\n", + " Parameters:\n", + "\n", + " x : array_like\n", + " Independent variable data points.\n", + " y : array_like\n", + " Dependent variable data points.\n", + " degree: int\n", + " Degree of the polynomial fit (default is 1 for linear).\n", + "\n", + " Returns:\n", + " y_fitted : ndarray\n", + " Fitted y values for the given x based on the polynomial fit.\n", + " poly_fn : np.poly1d\n", + " Polynomial function that represents the fit.\n", + " coeffs : ndarray\n", + " Coefficients of the fitted polynomial.\n", + " \"\"\"\n", + " # Validate inputs\n", + " if len(x) != len(y):\n", + " raise ValueError(\"Input arrays 'x' and 'y' must have the same length.\")\n", + " if degree < 1:\n", + " raise ValueError(\"Degree must be at least 1.\")\n", + "\n", + " # Perform polynomial fitting\n", + " coeffs = np.polyfit(x, y, degree)\n", + " poly_fn = np.poly1d(coeffs)\n", + " y_fitted = poly_fn(x)\n", + "\n", + " return y_fitted, poly_fn, coeffs\n", + "\n", + "\n", + "def NMAD_to_sigma_phase(nmad, method):\n", + " \"\"\"\n", + " Converts Normalized Median Absolute Deviation (NMAD) to the sigma of phase observations.\n", + "\n", + " This function uses an empirical cubic approximation (and is not based on physics):\n", + " sigma = a + b*nmad + c*nmad**2 + d*nmad**3\n", + "\n", + " Parameters:\n", + " nmad : array_like\n", + " Normalized Median Absolute Deviation value.\n", + " method : {'mean', 'mean_2_sigma'}\n", + " Method used to compute sigma.\n", + "\n", + " Returns:\n", + " sigma: ndarray\n", + " Estimated sigma value.\n", + " \"\"\"\n", + " if method == \"mean\":\n", + " a, b, c, d = -0.0144869469, 2.00028682, -5.23271341, 21.1111801\n", + " elif method == \"mean_2_sigma\":\n", + " a, b, c, d = 0.01907808, 1.2852969, 1.90052824, 11.60677721\n", + " else:\n", + " raise ValueError(\"Invalid method. Choose 'mean' or 'mean_2_sigma'.\")\n", + "\n", + " sigma = a + b * nmad + c * nmad**2 + d * nmad**3\n", + "\n", + " return sigma" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def lambda_one_point(stm_1pnt, stm_refpnt):\n", + " \"\"\"\n", + " Perform LAMBDA estimation for a single point.\n", + " Parameters:\n", + " stm_1pnt : xarray.Dataset\n", + " Dataset containing the time series data for the first point.\n", + " stm_refpnt : xarray.Dataset\n", + " Dataset containing the time series data for the reference point.\n", + " Returns:\n", + " x_hat : float\n", + " Estimated position.\n", + " Q_xhat : float\n", + " Estimated covariance of the position.\n", + " y_hat : float\n", + " Estimated velocity.\n", + " phs_unw_init : float\n", + " Unwrapped phase initialization.\n", + " \"\"\"\n", + "\n", + " # Get the sigma for the arc with the NMAD from the incremental time series\n", + " sigma_nmad_inc_i = NMAD_to_sigma_phase(\n", + " stm_1pnt[\"nmad_inc_stm\"].data, \"mean_2_sigma\"\n", + " )\n", + " sigma_nmad_inc_j = NMAD_to_sigma_phase(\n", + " stm_refpnt[\"nmad_inc_stm\"].data, \"mean_2_sigma\"\n", + " )\n", + " sigma_nmad_inc_arc = np.sqrt(\n", + " np.square(sigma_nmad_inc_i) + np.square(sigma_nmad_inc_j)\n", + " )\n", + "\n", + " # Compute arc phase\n", + " phs_wrapped_arc = np.angle(\n", + " stm_refpnt[\"sd_complex\"].data * stm_1pnt[\"sd_complex\"].data.conj()\n", + " )\n", + "\n", + " # Compute 'mean' h2ph value for the arc (which we currently model as the average of the two time series)\n", + " h2ph_arc = (stm_refpnt[\"h2ph_values\"].data + stm_1pnt[\"h2ph_values\"].data) / 2\n", + "\n", + " x_hat, Q_xhat, y_hat, phs_unw_init = lambda_estimation(\n", + " wavelength=wavelength,\n", + " phs_wrapped=phs_wrapped_arc,\n", + " sigma_phs_apri=sigma_nmad_inc_arc,\n", + " years=stm_1pnt[\"years\"].data,\n", + " h2ph_arc=h2ph_arc,\n", + " temp=stm_1pnt[\"temperature\"].data,\n", + " sigma_offset=sigma_offset,\n", + " sigma_vel=sigma_vel,\n", + " sigma_h=sigma_h,\n", + " sigma_ther=sigma_ther,\n", + " method=method,\n", + " )\n", + "\n", + " return x_hat, Q_xhat, y_hat, phs_unw_init" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Perform unwrapping with lambda estimation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# select the reference point\n", + "stm_refpnt = stm.isel(space=ref_pnt_idx)\n", + "\n", + "# subset the data for debug\n", + "NUM_POINTS = 5\n", + "stm = stm.isel(space=range(5))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initiate empty arrays to store results\n", + "x_hat = np.zeros((stm.sizes[\"space\"], 4))\n", + "Q_xhat = np.zeros((4, 4, stm.sizes[\"space\"]))\n", + "y_hats = np.zeros((stm.sizes[\"space\"], stm.sizes[\"time\"]))\n", + "phs_unw_init = np.zeros((stm.sizes[\"space\"], stm.sizes[\"time\"]))\n", + "\n", + "\n", + "# Map all points to the function as a dask bag\n", + "stm_bags = db.from_sequence(\n", + " [stm.isel(space=pnt_id) for pnt_id in range(stm.sizes[\"space\"])]\n", + ")\n", + "results = stm_bags.map(lambda_one_point, stm_refpnt)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dask.visualize(results)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Compute the results\n", + "results_compute = results.compute()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Unpack results\n", + "for pnt_id in range(stm.sizes[\"space\"]):\n", + " (\n", + " x_hat[pnt_id, :],\n", + " Q_xhat[:, :, pnt_id],\n", + " y_hats[pnt_id, :],\n", + " phs_unw_init[pnt_id, :],\n", + " ) = results_compute[pnt_id]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# For now, write x_hat and Q_xhat to npz files, since they so not fit in dimensions of an STM\n", + "np.savez(\n", + " \"./x_hat_Q_xhat.npz\",\n", + " x_hat=x_hat,\n", + " Q_xhat=Q_xhat,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Attach y_hats and phs_unw_init to the STM\n", + "stm[\"y_hats\"] = ((\"space\", \"time\"), y_hats)\n", + "stm[\"phs_unw_init\"] = ((\"space\", \"time\"), phs_unw_init)\n", + "stm" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "stm.to_zarr(\n", + " \"./stm_amsterdam_173p_init_unw.zarr\",\n", + " mode=\"w\",\n", + ")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "depsi-dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/profiling_example/lambda_unwrap_dask.py b/profiling_example/lambda_unwrap_dask.py new file mode 100644 index 0000000..5875e4d --- /dev/null +++ b/profiling_example/lambda_unwrap_dask.py @@ -0,0 +1,151 @@ +import numpy as np +import xarray as xr +import dask +import dask.bag as db +from depsi.dynamic_estimation import lambda_estimation + +# Configure dask scheduler +dask.config.set(scheduler="processes") + +# Constants +WAVELENGTH = 0.055465763 # m WAVELENGTH of Sentinel-1 (C-band) + +# Initial gauss for the sigma of the unknown parameters +SIGMA_OFFSET = 0.001 # m +SIGMA_VEL = 0.0001 # m/yr +sigma_h = 5 # m +SIGMA_THER = 0.00005 # m/°C + +# the option for the LAMBDA METHOD +# METHOD == 1: ILS with shrinking search +# METHOD == 2: Integer rounding +# METHOD == 3: Integer bootstrapping +# METHOD == 4: PAR +# METHOD == 5: ILS with Ratio Test +METHOD = 3 + +# Number of points to load for debugging +# Set to None to load all points +NUM_POINTS = 60 + +# Input file path +FILE_PATH = "../../data/stm_amsterdam_173p.zarr" + +# Number of partitions for dask bag +# This should be configured because by default dask bag uses ~100 partitions +DB_PATITIONS = 12 + +def NMAD_to_sigma_phase(nmad, METHOD): + """ + Converts Normalized Median Absolute Deviation (NMAD) to the sigma of phase observations. + + This function uses an empirical cubic approximation (and is not based on physics): + sigma = a + b*nmad + c*nmad**2 + d*nmad**3 + + Parameters: + nmad : array_like + Normalized Median Absolute Deviation value. + METHOD : {'mean', 'mean_2_sigma'} + METHOD used to compute sigma. + + Returns: + sigma: ndarray + Estimated sigma value. + """ + if METHOD == "mean": + a, b, c, d = -0.0144869469, 2.00028682, -5.23271341, 21.1111801 + elif METHOD == "mean_2_sigma": + a, b, c, d = 0.01907808, 1.2852969, 1.90052824, 11.60677721 + else: + raise ValueError("Invalid METHOD. Choose 'mean' or 'mean_2_sigma'.") + + sigma = a + b * nmad + c * nmad**2 + d * nmad**3 + + return sigma + +def lambda_one_point(stm_1pnt, stm_refpnt): + """ + Estimate the lambda for one point. + """ + + # Get the sigma for the arc with the NMAD from the incremental time series + sigma_nmad_inc_i = NMAD_to_sigma_phase( + stm_1pnt["nmad_inc_stm"].data, "mean_2_sigma" + ) + sigma_nmad_inc_j = NMAD_to_sigma_phase( + stm_refpnt["nmad_inc_stm"].data, "mean_2_sigma" + ) + sigma_nmad_inc_arc = np.sqrt( + np.square(sigma_nmad_inc_i) + np.square(sigma_nmad_inc_j) + ) + + # Compute arc phase + phs_wrapped_arc = np.angle( + stm_refpnt["sd_complex"].data * stm_1pnt["sd_complex"].data.conj() + ) + + # Compute 'mean' h2ph value for the arc (which we currently model as the average of the two time series) + h2ph_arc = (stm_refpnt["h2ph_values"].data + stm_1pnt["h2ph_values"].data) / 2 + + results = lambda_estimation( + wavelength=WAVELENGTH, + phs_wrapped=phs_wrapped_arc, + sigma_phs_apri=sigma_nmad_inc_arc, + years=stm_1pnt["years"].data, + h2ph_arc=h2ph_arc, + temp=stm_1pnt["temperature"].data, + sigma_offset=SIGMA_OFFSET, + sigma_vel=SIGMA_VEL, + sigma_h=sigma_h, + sigma_ther=SIGMA_THER, + method=METHOD, + ) + + # Unpack results + x_hat, Q_xhat, y_hat, phs_unw_init = ( + results[0], results[1], results[2], results[3] + ) + return x_hat, Q_xhat, y_hat, phs_unw_init + +if __name__ == "__main__": + # Load all to memory + # Must load to memory first since dask bags does not take dask arrays + # Another option is to load each point by a delayed function before mapping + stm = xr.open_zarr(FILE_PATH).compute() + + # subset the data for debug + if NUM_POINTS is not None: + # Select a subset of points for debugging + stm = stm.isel(space=range(NUM_POINTS)) + + # select the reference point + nmad_init = np.array(stm["nmad_init"]) + ref_pnt_idx = int(np.argmin(nmad_init)) + stm_refpnt = stm.isel(space=ref_pnt_idx) + + # Initiate empty arrays to store results + x_hat = np.zeros((stm.sizes["space"], 4)) + Q_xhat = np.zeros((4, 4, stm.sizes["space"])) + y_hats = np.zeros((stm.sizes["space"], stm.sizes["time"])) + phs_unw_init = np.zeros((stm.sizes["space"], stm.sizes["time"])) + + stm_bags = db.from_sequence( + [stm.isel(space=pnt_id) for pnt_id in range(stm.sizes["space"])], npartitions=DB_PATITIONS + ) + results = stm_bags.map(lambda_one_point, stm_refpnt) + + # Compute the results + results_compute = results.compute() + + # Unpack results + for pnt_id in range(stm.sizes["space"]): + x_hat[pnt_id, :], Q_xhat[:, :, pnt_id], y_hats[pnt_id, :], phs_unw_init[pnt_id, :] = results_compute[pnt_id] + + # Write to file + np.savez( + "./results.npz", + x_hat=x_hat, + Q_xhat=Q_xhat, + y_hats=y_hats, + phs_unw_init=phs_unw_init, + ) \ No newline at end of file