-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
ci: run pre-commit hooks on all files
- Loading branch information
1 parent
617f360
commit 6a01ffb
Showing
6 changed files
with
141 additions
and
133 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,10 @@ | ||
# Geofabrik | ||
|
||
This is a simple utility to download country data files from | ||
[GeoFabrik](https://download.geofabrik.de/). | ||
[GeoFabrik](https://download.geofabrik.de/). | ||
|
||
options: | ||
--help(-h) show this help message and exit | ||
--verbose(-v) verbose output | ||
--file(-f) FILE The country or US state to download | ||
--list(-l) List all files on GeoFabrik | ||
options: | ||
--help(-h) show this help message and exit | ||
--verbose(-v) verbose output | ||
--file(-f) FILE The country or US state to download | ||
--list(-l) List all files on GeoFabrik |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,35 +20,31 @@ | |
# <[email protected]> | ||
|
||
import argparse | ||
import concurrent.futures | ||
import logging | ||
import subprocess | ||
import sys | ||
import os | ||
import concurrent.futures | ||
import geojson | ||
from geojson import Feature, FeatureCollection | ||
from sys import argv | ||
from pathlib import Path | ||
from cpuinfo import get_cpu_info | ||
from shapely.geometry import shape | ||
from sys import argv | ||
|
||
import geojson | ||
import pyarrow.parquet as pq | ||
from codetiming import Timer | ||
from osm_rawdata.postgres import uriParser | ||
from cpuinfo import get_cpu_info | ||
from progress.spinner import PixelSpinner | ||
from shapely import wkb | ||
from shapely.geometry import shape | ||
from sqlalchemy import MetaData, cast, column, create_engine, select, table, text | ||
from sqlalchemy.dialects.postgresql import JSONB, insert | ||
from sqlalchemy.engine.base import Connection | ||
from sqlalchemy.orm import sessionmaker | ||
from sqlalchemy_utils import create_database, database_exists | ||
from sqlalchemy.engine.base import Connection | ||
from shapely.geometry import Point, LineString, Polygon | ||
from shapely import wkt, wkb | ||
|
||
# Find the other files for this project | ||
import osm_rawdata as rw | ||
import osm_rawdata.db_models | ||
from osm_rawdata.db_models import Base | ||
from osm_rawdata.postgres import uriParser | ||
|
||
rootdir = rw.__path__[0] | ||
|
||
|
@@ -57,71 +53,73 @@ | |
|
||
# The number of threads is based on the CPU cores | ||
info = get_cpu_info() | ||
cores = info['count'] | ||
cores = info["count"] | ||
|
||
|
||
def importThread( | ||
data: list, | ||
db: Connection, | ||
): | ||
data: list, | ||
db: Connection, | ||
): | ||
"""Thread to handle importing | ||
Args: | ||
data (list): The list of tiles to download | ||
db (Connection): A database connection | ||
""" | ||
# log.debug(f"In importThread()") | ||
#timer = Timer(text="importThread() took {seconds:.0f}s") | ||
#timer.start() | ||
# timer = Timer(text="importThread() took {seconds:.0f}s") | ||
# timer.start() | ||
ways = table( | ||
"ways_poly", | ||
column("id"), | ||
column("user"), | ||
column("geom"), | ||
column("tags"), | ||
) | ||
) | ||
|
||
nodes = table( | ||
"nodes", | ||
column("id"), | ||
column("user"), | ||
column("geom"), | ||
column("tags"), | ||
) | ||
) | ||
|
||
index = 0 | ||
|
||
for feature in data: | ||
# log.debug(feature) | ||
index -= 1 | ||
entry = dict() | ||
tags = feature['properties'] | ||
tags['building'] = 'yes' | ||
entry['id'] = index | ||
tags = feature["properties"] | ||
tags["building"] = "yes" | ||
entry["id"] = index | ||
ewkt = shape(feature["geometry"]) | ||
geom = wkb.dumps(ewkt) | ||
type = ewkt.geom_type | ||
scalar = select(cast(tags, JSONB)) | ||
|
||
if type == 'Polygon': | ||
if type == "Polygon": | ||
sql = insert(ways).values( | ||
# id = entry['id'], | ||
geom=geom, | ||
tags=scalar, | ||
) | ||
elif type == 'Point': | ||
) | ||
elif type == "Point": | ||
sql = insert(nodes).values( | ||
# id = entry['id'], | ||
geom=geom, | ||
tags=scalar, | ||
) | ||
) | ||
|
||
db.execute(sql) | ||
# db.commit() | ||
|
||
|
||
def parquetThread( | ||
data: list, | ||
db: Connection, | ||
): | ||
): | ||
"""Thread to handle importing | ||
Args: | ||
|
@@ -136,15 +134,15 @@ def parquetThread( | |
column("user"), | ||
column("geom"), | ||
column("tags"), | ||
) | ||
) | ||
|
||
nodes = table( | ||
"nodes", | ||
column("id"), | ||
column("user"), | ||
column("geom"), | ||
column("tags"), | ||
) | ||
) | ||
|
||
index = -1 | ||
log.debug(f"There are {len(data)} entries in the data") | ||
|
@@ -202,6 +200,7 @@ def parquetThread( | |
# print(f"FIXME2: {entry}") | ||
timer.stop() | ||
|
||
|
||
class MapImporter(object): | ||
def __init__( | ||
self, | ||
|
@@ -229,7 +228,7 @@ def __init__( | |
"CREATE EXTENSION IF NOT EXISTS postgis; CREATE EXTENSION IF NOT EXISTS hstore;CREATE EXTENSION IF NOT EXISTS dblink;" | ||
) | ||
self.db.execute(sql) | ||
#self.db.commit() | ||
# self.db.commit() | ||
|
||
Base.metadata.create_all(bind=engine) | ||
|
||
|
@@ -354,8 +353,8 @@ def importGeoJson( | |
""" | ||
# load the GeoJson file | ||
file = open(infile, "r") | ||
#size = os.path.getsize(infile) | ||
#for line in file.readlines(): | ||
# size = os.path.getsize(infile) | ||
# for line in file.readlines(): | ||
# print(line) | ||
data = geojson.load(file) | ||
|
||
|
@@ -379,26 +378,27 @@ def importGeoJson( | |
meta.create_all(engine) | ||
|
||
# A chunk is a group of threads | ||
entries = len(data['features']) | ||
entries = len(data["features"]) | ||
chunk = round(entries / cores) | ||
|
||
if entries <= chunk: | ||
result = importThread(data['features'], connections[0]) | ||
result = importThread(data["features"], connections[0]) | ||
timer.stop() | ||
return True | ||
|
||
with concurrent.futures.ThreadPoolExecutor(max_workers=cores) as executor: | ||
block = 0 | ||
while block <= entries: | ||
log.debug("Dispatching Block %d:%d" % (block, block + chunk)) | ||
result = executor.submit(importThread, data['features'][block : block + chunk], connections[index]) | ||
result = executor.submit(importThread, data["features"][block : block + chunk], connections[index]) | ||
block += chunk | ||
index += 1 | ||
executor.shutdown() | ||
timer.stop() | ||
|
||
return True | ||
|
||
|
||
def main(): | ||
"""This main function lets this class be run standalone by a bash script.""" | ||
parser = argparse.ArgumentParser( | ||
|
@@ -441,6 +441,7 @@ def main(): | |
mi.importParquet(args.infile) | ||
log.info(f"Imported {args.infile} into {args.uri}") | ||
|
||
|
||
if __name__ == "__main__": | ||
"""This is just a hook so this file can be run standalone during development.""" | ||
main() |
Oops, something went wrong.