From 7a8197ac0a65231fb9446f2e9bb8cadeedb30fe9 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 22 Feb 2024 12:04:12 +0700 Subject: [PATCH 01/14] A motivating demo for MARA to work toward. --- demo/mara_join.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 demo/mara_join.py diff --git a/demo/mara_join.py b/demo/mara_join.py new file mode 100644 index 0000000..39214c8 --- /dev/null +++ b/demo/mara_join.py @@ -0,0 +1,39 @@ +"""Demo of a spatial join using Qarray and dask_geopandas for MARA.""" +import dask.dataframe as dd +import dask_geopandas as gdd +import xarray as xr +import qarray as qr + + +mv_df = dd.read_csv( + 'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/' + 'sample_data/vector/movbank_data.csv' +) + +mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp'], utc=True) +mv_df['geometry'] = gdd.points_from_xy( + mv_df['location-long'], mv_df['location-lat'] +) +# What is the CRS? +mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry') + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( + time=slice(mv_gdf['timestamp'].min(), mv_gdf['timestamp'].max()) +) +era5_wind_df = qr.to_dd(era5_wind_ds) +# What is the CRS? +era5_wind_df['geometry'] = gdd.points_from_xy( + era5_wind_df['longitude'], era5_wind_df['latitude'], era5_wind_df['level'] +) +era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') + + +# Only an inner spatial join is supported right now (in dask_geopandas). +intersection = mv_gdf.sjoin(era5_wind_gdf) +print(intersection.head()) + From f9ee95186a66e913b322a57a7410f9885babf88b Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 25 Feb 2024 20:15:49 +0700 Subject: [PATCH 02/14] Debugged demo join; now focused on tuning performance. --- demo/mara_join.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index 39214c8..b71838e 100644 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -7,28 +7,34 @@ mv_df = dd.read_csv( 'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/' - 'sample_data/vector/movbank_data.csv' + 'sample_data/vector/movbank_data.csv', ) -mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp'], utc=True) +mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp']) +mv_df.set_index('timestamp', drop=False, sort=True) mv_df['geometry'] = gdd.points_from_xy( - mv_df['location-long'], mv_df['location-lat'] + mv_df, 'location-long', 'location-lat', crs=4326 +) +timerange = slice( + mv_df.timestamp.min().compute(), + mv_df.timestamp.max().compute(), ) -# What is the CRS? mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry') +# For MARA, we'd replace this with an Xee call. era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} + chunks={'time': 48, 'level': 1} ) era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( - time=slice(mv_gdf['timestamp'].min(), mv_gdf['timestamp'].max()) + time=timerange, + level=1000, # surface level only. ) era5_wind_df = qr.to_dd(era5_wind_ds) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( - era5_wind_df['longitude'], era5_wind_df['latitude'], era5_wind_df['level'] + era5_wind_df, 'longitude', 'latitude', ) era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') From 631bc52c7555a95a140e6981fccd8b22507dccc8 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 25 Feb 2024 22:55:50 +0700 Subject: [PATCH 03/14] Using new chunks argument. --- demo/mara_join.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index b71838e..3141513 100644 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -25,13 +25,13 @@ era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 48, 'level': 1} + chunks={'time': 240, 'level': 1} ) era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( time=timerange, level=1000, # surface level only. ) -era5_wind_df = qr.to_dd(era5_wind_ds) +era5_wind_df = qr.to_dd(era5_wind_ds, chunks=dict(time=6)) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( era5_wind_df, 'longitude', 'latitude', From 09c7721e7a6ed80c31040d0fc2fa20b4f3f209b5 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Wed, 28 Feb 2024 19:11:51 +0700 Subject: [PATCH 04/14] Made the join into a script. Added print debug stmts. --- demo/mara_join.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) mode change 100644 => 100755 demo/mara_join.py diff --git a/demo/mara_join.py b/demo/mara_join.py old mode 100644 new mode 100755 index 3141513..3aea74a --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -1,3 +1,4 @@ +#!/usr/bin/env python3 """Demo of a spatial join using Qarray and dask_geopandas for MARA.""" import dask.dataframe as dd import dask_geopandas as gdd @@ -27,19 +28,20 @@ '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', chunks={'time': 240, 'level': 1} ) +print('era5 dataset opened.') era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( time=timerange, level=1000, # surface level only. ) -era5_wind_df = qr.to_dd(era5_wind_ds, chunks=dict(time=6)) +era5_wind_df = qr.to_dd(era5_wind_ds) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( era5_wind_df, 'longitude', 'latitude', ) era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') - +print('beginning spatial join') # Only an inner spatial join is supported right now (in dask_geopandas). -intersection = mv_gdf.sjoin(era5_wind_gdf) -print(intersection.head()) +intersection = era5_wind_gdf.sjoin(mv_gdf).compute() +print(intersection.compute) From d20f55cce4e01fa2edc427bc2674834e75798882 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 3 Mar 2024 15:33:01 +0530 Subject: [PATCH 05/14] Not totally working demo, but much progress made. --- demo/mara_join.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index 3aea74a..f510440 100755 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -33,7 +33,7 @@ time=timerange, level=1000, # surface level only. ) -era5_wind_df = qr.to_dd(era5_wind_ds) +era5_wind_df = qr.read_xarray(era5_wind_ds) # What is the CRS? era5_wind_df['geometry'] = gdd.points_from_xy( era5_wind_df, 'longitude', 'latitude', @@ -43,5 +43,5 @@ print('beginning spatial join') # Only an inner spatial join is supported right now (in dask_geopandas). intersection = era5_wind_gdf.sjoin(mv_gdf).compute() -print(intersection.compute) +print(intersection) From 4c5181ae739ee0597d2314e374094319805b42cf Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 3 Mar 2024 15:36:22 +0530 Subject: [PATCH 06/14] Swapped order; added login note. --- demo/mara_join.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/demo/mara_join.py b/demo/mara_join.py index f510440..6169fef 100755 --- a/demo/mara_join.py +++ b/demo/mara_join.py @@ -1,5 +1,11 @@ #!/usr/bin/env python3 -"""Demo of a spatial join using Qarray and dask_geopandas for MARA.""" +"""Demo of a spatial join using Qarray and dask_geopandas for MARA. + +Please run the following to access the ERA5 dataset: +``` +gcloud auth application-default login +``` +""" import dask.dataframe as dd import dask_geopandas as gdd import xarray as xr @@ -42,6 +48,6 @@ print('beginning spatial join') # Only an inner spatial join is supported right now (in dask_geopandas). -intersection = era5_wind_gdf.sjoin(mv_gdf).compute() +intersection = mv_gdf.sjoin(era5_wind_gdf).compute() print(intersection) From f90188e2daf5d8031757e706960e1d293e6c49b8 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 20:55:21 +0530 Subject: [PATCH 07/14] Simplest version of the SST Demo. --- demo/sst | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 demo/sst diff --git a/demo/sst b/demo/sst new file mode 100644 index 0000000..6e38b6e --- /dev/null +++ b/demo/sst @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to access the ERA5 dataset: +``` +gcloud auth application-default login +``` +""" +import xarray as xr +import xarray_sql as qr + +# TODO(alxmrs): Add coiled or dask cluster code. + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('dataset opened.') +# TODO(alxmrs): Slice to small time range based on script args. +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + level=1000, # surface level only. +) + +# chunk sizes determined from VM memory limit of 16 GB. +c = qr.Context() +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +df = c.sql(""" +SELECT + DATE("time") as date, + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + DATE("time") +""") + +# TODO(alxmrs): time slice should be in file name. +df.to_csv('global_avg_sst_*.cvs') \ No newline at end of file From 070200ece5ffdfb234de03ce9ba6f4fed5faf149 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 21:17:09 +0530 Subject: [PATCH 08/14] Added coiled cluster config. --- demo/sst | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/demo/sst b/demo/sst index 6e38b6e..178bfa4 100644 --- a/demo/sst +++ b/demo/sst @@ -9,7 +9,16 @@ gcloud auth application-default login import xarray as xr import xarray_sql as qr -# TODO(alxmrs): Add coiled or dask cluster code. +from coiled import Cluster + +cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, +) +client = cluster.get_client() +cluster.adapt(minimum=1, maximum=100) era5_ds = xr.open_zarr( 'gs://gcp-public-data-arco-era5/ar/' @@ -22,7 +31,7 @@ era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( level=1000, # surface level only. ) -# chunk sizes determined from VM memory limit of 16 GB. +# chunk sizes determined from VM memory limit of 16 GiB. c = qr.Context() c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) From 9faafe0f17d42a7384394ff9620ffd4c1b63ca45 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sat, 23 Mar 2024 22:50:39 +0530 Subject: [PATCH 09/14] Added script arguments. # Conflicts: # pyproject.toml --- demo/sst | 50 ------------------------------------------ demo/sst.py | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 50 deletions(-) delete mode 100644 demo/sst create mode 100755 demo/sst.py diff --git a/demo/sst b/demo/sst deleted file mode 100644 index 178bfa4..0000000 --- a/demo/sst +++ /dev/null @@ -1,50 +0,0 @@ -#!/usr/bin/env python3 -"""Demo of calculating global average sea surface temperature (SST) with SQL. - -Please run the following to access the ERA5 dataset: -``` -gcloud auth application-default login -``` -""" -import xarray as xr -import xarray_sql as qr - -from coiled import Cluster - -cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, -) -client = cluster.get_client() -cluster.adapt(minimum=1, maximum=100) - -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) -print('dataset opened.') -# TODO(alxmrs): Slice to small time range based on script args. -era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - level=1000, # surface level only. -) - -# chunk sizes determined from VM memory limit of 16 GiB. -c = qr.Context() -c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) - -print('beginning query.') -df = c.sql(""" -SELECT - DATE("time") as date, - AVG("sea_surface_temperature") as daily_avg_sst -FROM - "era5" -GROUP BY - DATE("time") -""") - -# TODO(alxmrs): time slice should be in file name. -df.to_csv('global_avg_sst_*.cvs') \ No newline at end of file diff --git a/demo/sst.py b/demo/sst.py new file mode 100755 index 0000000..1482ec5 --- /dev/null +++ b/demo/sst.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +"""Demo of calculating global average sea surface temperature (SST) with SQL. + +Please run the following to set up cloud resources: +``` +gcloud auth application-default login +coiled setup +``` +""" +import argparse +import xarray as xr +import xarray_sql as qr + +parser = argparse.ArgumentParser() +parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') +parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') +parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') + +args = parser.parse_args() + +if args.cluster: + from coiled import Cluster + + cluster = Cluster( + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, + ) + client = cluster.get_client() + cluster.adapt(minimum=1, maximum=100) +else: + from dask.distributed import LocalCluster + cluster = LocalCluster(processes=False) + client = cluster.get_client() + +era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1} +) +print('dataset opened.') +era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( + time=slice(args.start, args.end), + level=1000, # surface level only. +) + +c = qr.Context() +# chunk sizes determined from VM memory limit of 16 GiB. +c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) + +print('beginning query.') +df = c.sql(""" +SELECT + DATE("time") as date, + AVG("sea_surface_temperature") as daily_avg_sst +FROM + "era5" +GROUP BY + DATE("time") +""") + +df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') From 26e88098dc5d10901b7c418857958eea0f11c164 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Sun, 24 Mar 2024 16:45:12 +0530 Subject: [PATCH 10/14] SST demo works with local fake data. --- demo/sst.py | 94 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 19 deletions(-) diff --git a/demo/sst.py b/demo/sst.py index 1482ec5..6b83e75 100755 --- a/demo/sst.py +++ b/demo/sst.py @@ -11,10 +11,58 @@ import xarray as xr import xarray_sql as qr + +def local_data(start: str, end: str) -> xr.Dataset: + import numpy as np + import pandas as pd + + np.random.seed(42) + + lat = np.linspace(-90, 90, num=720) + lon = np.linspace(-180, 180, num=1440) + time = pd.date_range(start, end, freq='H') + level = np.array([1000, 500], dtype=np.int32) + reference_time = pd.Timestamp(start) + + temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) + precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) + + return xr.Dataset( + data_vars=dict( + sea_surface_temperature=( + ['lat', 'lon', 'time', 'level'], + temperature, + ), + precipitation=(['lat', 'lon', 'time', 'level'], precipitation), + ), + coords=dict( + lat=lat, + lon=lon, + time=time, + level=level, + reference_time=reference_time, + ), + attrs=dict(description='Random weather.'), + ) + + parser = argparse.ArgumentParser() -parser.add_argument('--start', type=str, default='2020-01-01', help='start time ISO string') -parser.add_argument('--end', type=str, default='2020-01-02', help='end time ISO string') -parser.add_argument('--cluster', action='store_true', help='deploy on coiled cluster') +parser.add_argument( + '--start', type=str, default='2020-01-01', help='start time ISO string' +) +parser.add_argument( + '--end', type=str, default='2020-01-02', help='end time ISO string' +) +parser.add_argument( + '--cluster', + action='store_true', + help='deploy on coiled cluster, default: local cluster', +) +parser.add_argument( + '--fake', + action='store_true', + help='use local dummy data, default: ARCO-ERA5 data', +) args = parser.parse_args() @@ -22,27 +70,32 @@ from coiled import Cluster cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, + region='us-central1', + worker_memory='16 GiB', + spot_policy='spot_with_fallback', + arm=True, ) client = cluster.get_client() cluster.adapt(minimum=1, maximum=100) else: from dask.distributed import LocalCluster + cluster = LocalCluster(processes=False) client = cluster.get_client() -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) +if args.fake: + era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) +else: + era5_ds = xr.open_zarr( + 'gs://gcp-public-data-arco-era5/ar/' + '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', + chunks={'time': 240, 'level': 1}, + ) + print('dataset opened.') era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=slice(args.start, args.end), - level=1000, # surface level only. + time=slice(args.start, args.end), + level=1000, # surface level only. ) c = qr.Context() @@ -50,14 +103,17 @@ c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) print('beginning query.') -df = c.sql(""" +# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. +df = c.sql( + """ SELECT - DATE("time") as date, + "time", AVG("sea_surface_temperature") as daily_avg_sst FROM "era5" GROUP BY - DATE("time") -""") + "time" +""" +) -df.to_csv(f'global_avg_sst_{args.start}-{args.end}_*.cvs') +df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs') From 1dc44897e70f54274b3c075cddcd0629c18f7f3e Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 3 Oct 2025 19:21:45 +0200 Subject: [PATCH 11/14] WIP - updated demo to use ArrayLake's Temporal ERA5 dataset. --- demo/mara_join.py | 53 ------------------- demo/sst.py | 119 ------------------------------------------ examples/mara_join.py | 36 +++++++++++++ 3 files changed, 36 insertions(+), 172 deletions(-) delete mode 100755 demo/mara_join.py delete mode 100755 demo/sst.py create mode 100755 examples/mara_join.py diff --git a/demo/mara_join.py b/demo/mara_join.py deleted file mode 100755 index 6169fef..0000000 --- a/demo/mara_join.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python3 -"""Demo of a spatial join using Qarray and dask_geopandas for MARA. - -Please run the following to access the ERA5 dataset: -``` -gcloud auth application-default login -``` -""" -import dask.dataframe as dd -import dask_geopandas as gdd -import xarray as xr -import qarray as qr - - -mv_df = dd.read_csv( - 'https://raw.githubusercontent.com/wildlife-dynamics/ecoscope/master/tests/' - 'sample_data/vector/movbank_data.csv', -) - -mv_df['timestamp'] = dd.to_datetime(mv_df['timestamp']) -mv_df.set_index('timestamp', drop=False, sort=True) -mv_df['geometry'] = gdd.points_from_xy( - mv_df, 'location-long', 'location-lat', crs=4326 -) -timerange = slice( - mv_df.timestamp.min().compute(), - mv_df.timestamp.max().compute(), -) -mv_gdf = gdd.from_dask_dataframe(mv_df, 'geometry') - -# For MARA, we'd replace this with an Xee call. -era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1} -) -print('era5 dataset opened.') -era5_wind_ds = era5_ds[['u_component_of_wind', 'v_component_of_wind']].sel( - time=timerange, - level=1000, # surface level only. -) -era5_wind_df = qr.read_xarray(era5_wind_ds) -# What is the CRS? -era5_wind_df['geometry'] = gdd.points_from_xy( - era5_wind_df, 'longitude', 'latitude', -) -era5_wind_gdf = gdd.from_dask_dataframe(era5_wind_df, 'geometry') - -print('beginning spatial join') -# Only an inner spatial join is supported right now (in dask_geopandas). -intersection = mv_gdf.sjoin(era5_wind_gdf).compute() -print(intersection) - diff --git a/demo/sst.py b/demo/sst.py deleted file mode 100755 index 6b83e75..0000000 --- a/demo/sst.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/env python3 -"""Demo of calculating global average sea surface temperature (SST) with SQL. - -Please run the following to set up cloud resources: -``` -gcloud auth application-default login -coiled setup -``` -""" -import argparse -import xarray as xr -import xarray_sql as qr - - -def local_data(start: str, end: str) -> xr.Dataset: - import numpy as np - import pandas as pd - - np.random.seed(42) - - lat = np.linspace(-90, 90, num=720) - lon = np.linspace(-180, 180, num=1440) - time = pd.date_range(start, end, freq='H') - level = np.array([1000, 500], dtype=np.int32) - reference_time = pd.Timestamp(start) - - temperature = 15 + 8 * np.random.randn(720, 1440, len(time), len(level)) - precipitation = 10 * np.random.rand(720, 1440, len(time), len(level)) - - return xr.Dataset( - data_vars=dict( - sea_surface_temperature=( - ['lat', 'lon', 'time', 'level'], - temperature, - ), - precipitation=(['lat', 'lon', 'time', 'level'], precipitation), - ), - coords=dict( - lat=lat, - lon=lon, - time=time, - level=level, - reference_time=reference_time, - ), - attrs=dict(description='Random weather.'), - ) - - -parser = argparse.ArgumentParser() -parser.add_argument( - '--start', type=str, default='2020-01-01', help='start time ISO string' -) -parser.add_argument( - '--end', type=str, default='2020-01-02', help='end time ISO string' -) -parser.add_argument( - '--cluster', - action='store_true', - help='deploy on coiled cluster, default: local cluster', -) -parser.add_argument( - '--fake', - action='store_true', - help='use local dummy data, default: ARCO-ERA5 data', -) - -args = parser.parse_args() - -if args.cluster: - from coiled import Cluster - - cluster = Cluster( - region='us-central1', - worker_memory='16 GiB', - spot_policy='spot_with_fallback', - arm=True, - ) - client = cluster.get_client() - cluster.adapt(minimum=1, maximum=100) -else: - from dask.distributed import LocalCluster - - cluster = LocalCluster(processes=False) - client = cluster.get_client() - -if args.fake: - era5_ds = local_data(args.start, args.end).chunk({'time': 240, 'level': 1}) -else: - era5_ds = xr.open_zarr( - 'gs://gcp-public-data-arco-era5/ar/' - '1959-2022-full_37-1h-0p25deg-chunk-1.zarr-v2', - chunks={'time': 240, 'level': 1}, - ) - -print('dataset opened.') -era5_sst_ds = era5_ds[['sea_surface_temperature']].sel( - time=slice(args.start, args.end), - level=1000, # surface level only. -) - -c = qr.Context() -# chunk sizes determined from VM memory limit of 16 GiB. -c.create_table('era5', era5_sst_ds, chunks=dict(time=24)) - -print('beginning query.') -# TODO(alxmrs): `DATE` function is not supported in Apache Calcite out-of-the-box. -df = c.sql( - """ -SELECT - "time", - AVG("sea_surface_temperature") as daily_avg_sst -FROM - "era5" -GROUP BY - "time" -""" -) - -df.to_csv(f'global_avg_sst_{args.start}_to_{args.end}_*.cvs') diff --git a/examples/mara_join.py b/examples/mara_join.py new file mode 100755 index 0000000..ee36146 --- /dev/null +++ b/examples/mara_join.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +# /// script +# dependencies = [ +# "arraylake", +# "icechunk", +# "xarray", +# "xarray-sql", +# "pandas", +# ] +# /// + +"""Demo of a (spatial) join using Xarray-SQL and DataFusion for MARA.""" +import pandas as pd +import xarray as xr +import xarray_sql as xql +from arraylake import Client + +# Login and get access to EarthMover's Temporal ERA5 dataset. +client = Client() +client.login() +repo = client.get_repo("earthmover-public/era5-surface-aws") +ds = xr.open_zarr(repo.readonly_session("main").store, group="temporal", chunks=None, zarr_format=3, consolidated=False) + +# Create a DataFusion context and register the datasets as tables. +ctx = xql.XarrayContext() +ctx.from_dataset('era5', ds.isel(time=8736*2), chunks=dict(latitude=12, longitude=12)) +ctx.from_pandas( + pd.read_feather( + 'https://github.com/wildlife-dynamics/ecoscope/raw/refs/heads/master/' + 'tests/sample_data/vector/movebank_data.feather' + ), + 'movebank', +) + +# TODO(alxmrs): When I'm not on cellular internet, explore the datasets and write the +# join in SQL. From d131030769dec3d559e340085b8a90640938f499 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 3 Oct 2025 19:23:45 +0200 Subject: [PATCH 12/14] reformat with pyink. --- examples/mara_join.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/examples/mara_join.py b/examples/mara_join.py index ee36146..4d63995 100755 --- a/examples/mara_join.py +++ b/examples/mara_join.py @@ -18,18 +18,26 @@ # Login and get access to EarthMover's Temporal ERA5 dataset. client = Client() client.login() -repo = client.get_repo("earthmover-public/era5-surface-aws") -ds = xr.open_zarr(repo.readonly_session("main").store, group="temporal", chunks=None, zarr_format=3, consolidated=False) +repo = client.get_repo('earthmover-public/era5-surface-aws') +ds = xr.open_zarr( + repo.readonly_session('main').store, + group='temporal', + chunks=None, + zarr_format=3, + consolidated=False, +) # Create a DataFusion context and register the datasets as tables. ctx = xql.XarrayContext() -ctx.from_dataset('era5', ds.isel(time=8736*2), chunks=dict(latitude=12, longitude=12)) +ctx.from_dataset( + 'era5', ds.isel(time=8736 * 2), chunks=dict(latitude=12, longitude=12) +) ctx.from_pandas( - pd.read_feather( - 'https://github.com/wildlife-dynamics/ecoscope/raw/refs/heads/master/' - 'tests/sample_data/vector/movebank_data.feather' - ), - 'movebank', + pd.read_feather( + 'https://github.com/wildlife-dynamics/ecoscope/raw/refs/heads/master/' + 'tests/sample_data/vector/movebank_data.feather' + ), + 'movebank', ) # TODO(alxmrs): When I'm not on cellular internet, explore the datasets and write the From ac6718b181aee4008921db4c4f52778c0f55469c Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 3 Oct 2025 19:31:09 +0200 Subject: [PATCH 13/14] rm isel --- examples/mara_join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/mara_join.py b/examples/mara_join.py index 4d63995..4682458 100755 --- a/examples/mara_join.py +++ b/examples/mara_join.py @@ -30,7 +30,7 @@ # Create a DataFusion context and register the datasets as tables. ctx = xql.XarrayContext() ctx.from_dataset( - 'era5', ds.isel(time=8736 * 2), chunks=dict(latitude=12, longitude=12) + 'era5', ds, chunks=dict(latitude=12, longitude=12) ) ctx.from_pandas( pd.read_feather( From 793b154fc284ef79789f758b06db0a2203e6df0a Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 3 Oct 2025 19:31:40 +0200 Subject: [PATCH 14/14] Add temporal chunks --- examples/mara_join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/mara_join.py b/examples/mara_join.py index 4682458..bed79c3 100755 --- a/examples/mara_join.py +++ b/examples/mara_join.py @@ -30,7 +30,7 @@ # Create a DataFusion context and register the datasets as tables. ctx = xql.XarrayContext() ctx.from_dataset( - 'era5', ds, chunks=dict(latitude=12, longitude=12) + 'era5', ds, chunks=dict(time=8736, latitude=12, longitude=12) ) ctx.from_pandas( pd.read_feather(