Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 82 additions & 29 deletions src/main/scala/vectorpipe/VectorPipe.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,69 @@
package vectorpipe

import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets

import vectorpipe.vectortile._
import vectorpipe.vectortile.export._

import geotrellis.proj4.{CRS, LatLng, WebMercator}
import geotrellis.spark.SpatialKey
import geotrellis.spark.tiling.{ZoomedLayoutScheme, LayoutLevel}
import geotrellis.vector.Geometry
import geotrellis.spark.tiling.{LayoutLevel, ZoomedLayoutScheme}
import geotrellis.vector.{Extent, Feature, Geometry}
import geotrellis.vectortile._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StringType
import org.locationtech.jts.{geom => jts}
import org.locationtech.geomesa.spark.jts.SparkSessionWithJTS

case class TiledFeature(sk: SpatialKey, layer: String, geom: jts.Geometry, data: Map[String, Array[Byte]]) {
def feature: VectorTileFeature[Geometry] = Feature(geom, data.mapValues {
b =>
val bb = ByteBuffer.wrap(b)
bb.get() match {
case 0 =>
val bytes = Array.ofDim[Byte](bb.remaining())
bb.get(bytes)
VString(new String(bytes, StandardCharsets.UTF_8))
case 1 => VFloat(bb.getFloat())
case 2 => VDouble(bb.getDouble())
case 3 => VInt64(bb.getLong())
case 4 => VWord64(bb.getLong())
case 5 => VSint64(bb.getLong())
case 6 => VBool(bb.get() == 1)
}
})
}

object TiledFeature {
def apply(sk: SpatialKey, layer: String, feature: VectorTileFeature[Geometry]): TiledFeature =
TiledFeature(sk, layer, feature.geom.jtsGeom, feature.data.mapValues {
case VString(v) => (0: Byte) +: v.getBytes(StandardCharsets.UTF_8)
case VFloat(v) =>
ByteBuffer.allocate(5).put(2: Byte).putFloat(v).array()
case VDouble(v) =>
ByteBuffer.allocate(9).put(2: Byte).putDouble(v).array()
case VInt64(v) =>
ByteBuffer.allocate(9).put(3: Byte).putLong(v).array()
case VWord64(v) =>
ByteBuffer.allocate(9).put(4: Byte).putLong(v).array()
case VSint64(v) =>
ByteBuffer.allocate(9).put(5: Byte).putLong(v).array()
case VBool(v) if v =>
Array(6, 1)
case VBool(_) =>
Array(6, 0)
})
}

case class Tile(sk: SpatialKey, bytes: Array[Byte], extent: Extent) {
def tile: VectorTile = VectorTile.fromBytes(bytes, extent)
}

object Tile {
def apply(sk: SpatialKey, tile: VectorTile): Tile = Tile(sk, tile.toBytes, tile.tileExtent)
}

object VectorPipe {

Expand Down Expand Up @@ -74,7 +124,10 @@ object VectorPipe {
SpatialKey(k.col / 2, k.row / 2) }.toSeq
}

def generateVectorTiles[G <: Geometry](df: DataFrame, level: LayoutLevel): RDD[(SpatialKey, VectorTile)] = {
def generateVectorTiles(df: DataFrame, level: LayoutLevel): Dataset[Tile] = {
import df.sparkSession.implicits._
df.sparkSession.withJTS

val zoom = level.zoom
val clip = udf { (g: jts.Geometry, key: GenericRowWithSchema) =>
val k = getSpatialKey(key)
Expand All @@ -83,38 +136,38 @@ object VectorPipe {

val selectedGeometry = pipeline
.select(df, zoom, keyColumn)

val clipped = selectedGeometry
.withColumn(keyColumn, explode(col(keyColumn)))
.repartition(col(keyColumn)) // spread copies of possibly ill-tempered geometries around cluster prior to clipping
.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))

pipeline.layerMultiplicity match {
val clipped = pipeline.layerMultiplicity match {
case SingleLayer(layerName) =>
clipped
.rdd
.map { r => (getSpatialKey(r, keyColumn), pipeline.pack(r, zoom)) }
.groupByKey
.map { case (key, feats) =>
val ex = level.layout.mapTransform.keyToExtent(key)
key -> buildVectorTile(feats, layerName, ex, options.tileResolution, options.orderAreas)
selectedGeometry
.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))
.map {
row =>
TiledFeature(getSpatialKey(row, keyColumn), layerName, pipeline.pack(row, zoom))
}
case LayerNamesInColumn(layerNameCol) =>
assert(selectedGeometry.schema(layerNameCol).dataType == StringType,
s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}")
clipped
.rdd
.map { r => (getSpatialKey(r, keyColumn), r.getAs[String](layerNameCol) -> pipeline.pack(r, zoom)) }
.groupByKey
.mapPartitions{ iter: Iterator[(SpatialKey, Iterable[(String, VectorTileFeature[Geometry])])] =>
iter.map{ case (key, groupedFeatures) => {
val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] =
groupedFeatures.groupBy(_._1).mapValues(_.map(_._2))
val ex = level.layout.mapTransform.keyToExtent(key)
key -> buildVectorTile(layerFeatures, ex, options.tileResolution, options.orderAreas)
}}
}
s"layerMultiplicity=${pipeline.layerMultiplicity} requires String-type column of name ${layerNameCol}")

selectedGeometry
.withColumn(geomColumn, clip(col(geomColumn), col(keyColumn)))
.map {
row =>
TiledFeature(getSpatialKey(row, keyColumn), row.getAs[String](layerNameCol), pipeline.pack(row, zoom))
}
}

clipped
.groupByKey(r => r.sk)
.mapGroups {
case (sk, groupedFeatures) =>
val layerFeatures: Map[String, Iterable[VectorTileFeature[Geometry]]] =
groupedFeatures.toIterable.groupBy(_.layer).mapValues(_.map(_.feature))
val extent = level.layout.mapTransform.keyToExtent(sk)
Tile(sk, buildVectorTile(layerFeatures, extent, options.tileResolution, options.orderAreas))
}
}

// ITERATION:
Expand Down
14 changes: 10 additions & 4 deletions src/main/scala/vectorpipe/vectortile/export/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,26 @@ import geotrellis.spark.io.index.zcurve.Z2
import geotrellis.spark.io.s3._
import geotrellis.vectortile._
import org.apache.spark.rdd.RDD

import java.net.URI
import java.io.ByteArrayOutputStream
import java.util.zip.{GZIPOutputStream, ZipEntry, ZipOutputStream}

import org.apache.spark.sql.Dataset
import vectorpipe.Tile

package object export {
def saveVectorTiles(vectorTiles: RDD[(SpatialKey, VectorTile)], zoom: Int, uri: URI): Unit = {
def saveVectorTiles(vectorTiles: Dataset[Tile], zoom: Int, uri: URI): Unit = {
import vectorTiles.sparkSession.implicits._

val vts = vectorTiles.map(x => (x.sk, x.tile)).rdd

uri.getScheme match {
case "s3" =>
val path = uri.getPath
val prefix = path.stripPrefix("/").stripSuffix("/")
saveToS3(vectorTiles, zoom, uri.getAuthority, prefix)
saveToS3(vts, zoom, uri.getAuthority, prefix)
case _ =>
saveHadoop(vectorTiles, zoom, uri)
saveHadoop(vts, zoom, uri)
}
}

Expand Down
15 changes: 0 additions & 15 deletions src/main/scala/vectorpipe/vectortile/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,6 @@ package object vectortile {
)
}

def buildVectorTile[G <: Geometry](
features: Iterable[VectorTileFeature[G]],
layerName: String,
ex: Extent,
tileWidth: Int,
sorted: Boolean
): VectorTile = {
val layer =
if (sorted)
buildSortedLayer(features, layerName, ex, tileWidth)
else
buildLayer(features, layerName, ex, tileWidth)
VectorTile(Map(layerName -> layer), ex)
}

def buildVectorTile[G <: Geometry](
layerFeatures: Map[String, Iterable[VectorTileFeature[G]]],
ex: Extent,
Expand Down