-
Notifications
You must be signed in to change notification settings - Fork 18
/
assign_images.py
112 lines (96 loc) · 3.41 KB
/
assign_images.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from os import getenv
from pathlib import Path
import sys
from typing import Annotated
import geopandas as gpd
from loguru import logger as log
from pandas import Series
from requests.exceptions import HTTPError
from tenacity import RetryError
from tqdm import tqdm
from typer import Argument, Option, Typer
from src.images.image_source import ImageSourceSelector
from src.images.local_images import LocalImages
from src.images.mapillary import Mapillary
app = Typer()
@app.command()
def main(
points_file: Annotated[
Path,
Argument(help="Path to input points file"),
],
image_source: Annotated[
ImageSourceSelector, Argument(help="Where to get images from")
],
images_path: Annotated[
Path,
Argument(help="Where the images should be located"),
],
max_distance: Annotated[
float,
Option(help="Maximum distance between point and image location, in meters"),
] = 10,
verbose: Annotated[bool, Option(help="Sets log level to DEBUG")] = False,
) -> Path:
"""
Assigns Images to Points
Args:
points_file: Path to input points file
File format should be readable by geopandas.read_file
image_source: Where to get images from
images_path: Where the images should be located
max_distance: Maximum distance between point and image location, in meters
Can also be interpreted as "radius" of image bounding box
verbose: Sets log level to DEBUG
Returns: The Path of the output GPKG file
"""
log.remove()
if verbose:
log.add(sys.stdout, level="DEBUG")
else:
log.add(sys.stdout, level="INFO")
if image_source == ImageSourceSelector.local:
source = LocalImages(images_path, max_distance)
elif image_source == ImageSourceSelector.mapillary:
source = Mapillary(getenv("MAPILLARY_CLIENT_TOKEN"), images_path, max_distance)
else:
raise ValueError(f"Unknown Image Source: {image_source}")
gdf = gpd.read_file(points_file)
gdf["image_id"] = Series()
gdf["image_lat"] = Series()
gdf["image_lon"] = Series()
gdf["residual"] = Series()
gdf["image_path"] = Series()
gdf["error"] = Series()
for i, point in tqdm(
gdf.iterrows(),
total=len(gdf.index),
desc="Assigning Images to Points",
unit="points",
):
latitude = point["geometry"].y
longitude = point["geometry"].x
try:
results = source.get_image_from_coordinates(latitude, longitude)
gdf.at[i, "image_lat"] = results["image_lat"]
gdf.at[i, "image_lon"] = results["image_lon"]
gdf.at[i, "residual"] = results["residual"]
gdf.at[i, "image_id"] = results["image_id"]
gdf.at[i, "image_path"] = str(results["image_path"])
gdf.at[i, "error"] = results["error"]
except HTTPError or RetryError as e:
log.error(e)
gdf.at[i, "error"] = e.__class__.__name__
log.info(gdf.head())
log.info(
"Are There Duplicates? {}",
gdf[gdf["image_id"] is not None and gdf["image_id"].notna()]["image_id"]
.duplicated()
.any(),
)
output_file = Path(points_file.parent, f"{points_file.stem}_images.gpkg").resolve()
gdf.to_file(output_file, driver="GPKG")
log.success("Saved Points and Images to {}", output_file)
return output_file
if __name__ == "__main__":
app()