diff --git a/src/main/scala/vectorpipe/VectorPipe.scala b/src/main/scala/vectorpipe/VectorPipe.scala index c5d63c38..b70fbed5 100644 --- a/src/main/scala/vectorpipe/VectorPipe.scala +++ b/src/main/scala/vectorpipe/VectorPipe.scala @@ -1,19 +1,69 @@ package vectorpipe +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets + import vectorpipe.vectortile._ import vectorpipe.vectortile.export._ - import geotrellis.proj4.{CRS, LatLng, WebMercator} import geotrellis.spark.SpatialKey -import geotrellis.spark.tiling.{ZoomedLayoutScheme, LayoutLevel} -import geotrellis.vector.Geometry +import geotrellis.spark.tiling.{LayoutLevel, ZoomedLayoutScheme} +import geotrellis.vector.{Extent, Feature, Geometry} import geotrellis.vectortile._ -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StringType import org.locationtech.jts.{geom => jts} +import org.locationtech.geomesa.spark.jts.SparkSessionWithJTS + +case class TiledFeature(sk: SpatialKey, layer: String, geom: jts.Geometry, data: Map[String, Array[Byte]]) { + def feature: VectorTileFeature[Geometry] = Feature(geom, data.mapValues { + b => + val bb = ByteBuffer.wrap(b) + bb.get() match { + case 0 => + val bytes = Array.ofDim[Byte](bb.remaining()) + bb.get(bytes) + VString(new String(bytes, StandardCharsets.UTF_8)) + case 1 => VFloat(bb.getFloat()) + case 2 => VDouble(bb.getDouble()) + case 3 => VInt64(bb.getLong()) + case 4 => VWord64(bb.getLong()) + case 5 => VSint64(bb.getLong()) + case 6 => VBool(bb.get() == 1) + } + }) +} + +object TiledFeature { + def apply(sk: SpatialKey, layer: String, feature: VectorTileFeature[Geometry]): TiledFeature = + TiledFeature(sk, layer, feature.geom.jtsGeom, feature.data.mapValues { + case VString(v) => (0: Byte) +: v.getBytes(StandardCharsets.UTF_8) + case VFloat(v) => + ByteBuffer.allocate(5).put(2: Byte).putFloat(v).array() + case VDouble(v) => + ByteBuffer.allocate(9).put(2: Byte).putDouble(v).array() + case VInt64(v) => + ByteBuffer.allocate(9).put(3: Byte).putLong(v).array() + case VWord64(v) => + ByteBuffer.allocate(9).put(4: Byte).putLong(v).array() + case VSint64(v) => + ByteBuffer.allocate(9).put(5: Byte).putLong(v).array() + case VBool(v) if v => + Array(6, 1) + case VBool(_) => + Array(6, 0) + }) +} + +case class Tile(sk: SpatialKey, bytes: Array[Byte], extent: Extent) { + def tile: VectorTile = VectorTile.fromBytes(bytes, extent) +} + +object Tile { + def apply(sk: SpatialKey, tile: VectorTile): Tile = Tile(sk, tile.toBytes, tile.tileExtent) +} object VectorPipe { @@ -74,7 +124,10 @@ object VectorPipe { SpatialKey(k.col / 2, k.row / 2) }.toSeq } - def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): RDD[(SpatialKey, VectorTile)] = { + def generateVectorTiles(df: DataFrame, level: LayoutLevel): Dataset[Tile] = { + import df.sparkSession.implicits._ + df.sparkSession.withJTS + val zoom = level.zoom val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) => val k = getSpatialKey(key) @@ -83,38 +136,38 @@ object VectorPipe { val selectedGeometry = pipeline .select(df, zoom, keyColumn) - - val clipped = selectedGeometry .withColumn(keyColumn, explode(col(keyColumn))) .repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping - .withColumn(geomColumn, clip(col(geomColumn), col(keyColumn))) - pipeline.layerMultiplicity match { + val clipped = pipeline.layerMultiplicity match { case SingleLayer(layerName) => - clipped - .rdd - .map { r => (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) } - .groupByKey - .map { case (key, feats) => - val ex = level.layout.mapTransform.keyToExtent(key) - key -> buildVectorTile(feats, layerName, ex, options.tileResolution, options.orderAreas) + selectedGeometry + .withColumn(geomColumn, clip(col(geomColumn), col(keyColumn))) + .map { + row => + TiledFeature(getSpatialKey(row, keyColumn), layerName, pipeline.pack(row, zoom)) } case LayerNamesInColumn(layerNameCol) => assert(selectedGeometry.schema(layerNameCol).dataType == StringType, - s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}") - clipped - .rdd - .map { r => (getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol) -> pipeline.pack(r, zoom)) } - .groupByKey - .mapPartitions{ iter: Iterator[(SpatialKey, Iterable[(String, VectorTileFeature[Geometry])])] => - iter.map{ case (key, groupedFeatures) => { - val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] = - groupedFeatures.groupBy(_._1).mapValues(_.map(_._2)) - val ex = level.layout.mapTransform.keyToExtent(key) - key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas) - }} - } + s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}") + + selectedGeometry + .withColumn(geomColumn, clip(col(geomColumn), col(keyColumn))) + .map { + row => + TiledFeature(getSpatialKey(row, keyColumn), row.getAs[String](layerNameCol), pipeline.pack(row, zoom)) + } } + + clipped + .groupByKey(r => r.sk) + .mapGroups { + case (sk, groupedFeatures) => + val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] = + groupedFeatures.toIterable.groupBy(_.layer).mapValues(_.map(_.feature)) + val extent = level.layout.mapTransform.keyToExtent(sk) + Tile(sk, buildVectorTile(layerFeatures, extent, options.tileResolution, options.orderAreas)) + } } // ITERATION: diff --git a/src/main/scala/vectorpipe/vectortile/export/package.scala b/src/main/scala/vectorpipe/vectortile/export/package.scala index e6d0c3a9..659294a2 100644 --- a/src/main/scala/vectorpipe/vectortile/export/package.scala +++ b/src/main/scala/vectorpipe/vectortile/export/package.scala @@ -7,20 +7,26 @@ import geotrellis.spark.io.index.zcurve.Z2 import geotrellis.spark.io.s3._ import geotrellis.vectortile._ import org.apache.spark.rdd.RDD - import java.net.URI import java.io.ByteArrayOutputStream import java.util.zip.{GZIPOutputStream, ZipEntry, ZipOutputStream} +import org.apache.spark.sql.Dataset +import vectorpipe.Tile + package object export { - def saveVectorTiles(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int, uri: URI): Unit = { + def saveVectorTiles(vectorTiles: Dataset[Tile], zoom: Int, uri: URI): Unit = { + import vectorTiles.sparkSession.implicits._ + + val vts = vectorTiles.map(x => (x.sk, x.tile)).rdd + uri.getScheme match { case "s3" => val path = uri.getPath val prefix = path.stripPrefix("/").stripSuffix("/") - saveToS3(vectorTiles, zoom, uri.getAuthority, prefix) + saveToS3(vts, zoom, uri.getAuthority, prefix) case _ => - saveHadoop(vectorTiles, zoom, uri) + saveHadoop(vts, zoom, uri) } } diff --git a/src/main/scala/vectorpipe/vectortile/package.scala b/src/main/scala/vectorpipe/vectortile/package.scala index e2c77340..cc47c3c6 100644 --- a/src/main/scala/vectorpipe/vectortile/package.scala +++ b/src/main/scala/vectorpipe/vectortile/package.scala @@ -118,21 +118,6 @@ package object vectortile { ) } - def buildVectorTile[G <: Geometry]( - features: Iterable[VectorTileFeature[G]], - layerName: String, - ex: Extent, - tileWidth: Int, - sorted: Boolean - ): VectorTile = { - val layer = - if (sorted) - buildSortedLayer(features, layerName, ex, tileWidth) - else - buildLayer(features, layerName, ex, tileWidth) - VectorTile(Map(layerName -> layer), ex) - } - def buildVectorTile[G <: Geometry]( layerFeatures: Map[String, Iterable[VectorTileFeature[G]]], ex: Extent,