Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@ import org.apache.kafka.common.TopicPartition
* Objects that represent desired offset range limits for starting,
* ending, and specific offsets.
*/
private[kafka010] sealed trait KafkaOffsetRangeLimit
private /* [kafka010] */ sealed trait KafkaOffsetRangeLimit

/**
* Represents the desire to bind to the earliest offsets in Kafka
*/
private[kafka010] case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit
private /* [kafka010] */ case object EarliestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
* Represents the desire to bind to the latest offsets in Kafka
*/
private[kafka010] case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit
private /* [kafka010] */ case object LatestOffsetRangeLimit extends KafkaOffsetRangeLimit

/**
* Represents the desire to bind to specific offsets. A offset == -1 binds to the
* latest offset, and offset == -2 binds to the earliest offset.
*/
private[kafka010] case class SpecificOffsetRangeLimit(
private /* [kafka010] */ case class SpecificOffsetRangeLimit(
partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsetRangeLimit

private[kafka010] object KafkaOffsetRangeLimit {
private /* [kafka010] */ object KafkaOffsetRangeLimit {
/**
* Used to denote offset range limits that are resolved via Kafka
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
*
* Note: This class is not ThreadSafe
*/
private[kafka010] class KafkaOffsetReader(
private /* [kafka010] */ class KafkaOffsetReader(
consumerStrategy: ConsumerStrategy,
driverKafkaParams: ju.Map[String, Object],
readerOptions: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String


private[kafka010] class KafkaRelation(
private /* [kafka010] */ class KafkaRelation(
override val sqlContext: SQLContext,
kafkaReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
Expand All @@ -53,6 +53,7 @@ private[kafka010] class KafkaRelation(
override def schema: StructType = KafkaOffsetReader.kafkaSchema

override def buildScan(): RDD[Row] = {
if (true) throw new NullPointerException("hmm")
// Leverage the KafkaReader to obtain the relevant partition offsets
val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
* IllegalArgumentException when the Kafka Dataset is created, so that it can catch
* missing options even before the query is started.
*/
private[kafka010] class KafkaSourceProvider extends DataSourceRegister
private /* [kafka010] */ class KafkaSourceProvider extends DataSourceRegister
with StreamSourceProvider
with StreamSinkProvider
with RelationProvider
Expand Down Expand Up @@ -213,7 +213,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName)
}

private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
/* private */ def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) =
ConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
Expand All @@ -233,7 +233,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.build()

private def kafkaParamsForExecutors(
/* private */ def kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String], uniqueGroupId: String) =
ConfigUpdater("executor", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
Expand All @@ -253,7 +253,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
.build()

private def strategy(caseInsensitiveParams: Map[String, String]) =
/* private */ def strategy(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match {
case ("assign", value) =>
AssignStrategy(JsonUtils.partitions(value))
Expand All @@ -267,7 +267,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
throw new IllegalArgumentException("Unknown option")
}

private def failOnDataLoss(caseInsensitiveParams: Map[String, String]) =
/* private */ def failOnDataLoss(caseInsensitiveParams: Map[String, String]) =
caseInsensitiveParams.getOrElse(FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toBoolean

private def validateGeneralOptions(parameters: Map[String, String]): Unit = {
Expand Down Expand Up @@ -437,14 +437,18 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
}

private[kafka010] object KafkaSourceProvider {
private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
private /* [kafka010] */ object KafkaSourceProvider {
// private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
// private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
// private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
// private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", "assign")
val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
val TOPIC_OPTION_KEY = "topic"

private val deserClassName = classOf[ByteArrayDeserializer].getName
/* private */ val deserClassName = classOf[ByteArrayDeserializer].getName

def getKafkaOffsetRangeLimit(
params: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object UnsupportedOperationChecker {
def checkForBatch(plan: LogicalPlan): Unit = {
plan.foreachUp {
case p if p.isStreaming =>
throwError("Queries with streaming sources must be executed with writeStream.start()")(p)
// throwError("Queries with streaming sources must be executed with writeStream.start()")(p)

case _ =>
}
Expand All @@ -42,8 +42,8 @@ object UnsupportedOperationChecker {
def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {

if (!plan.isStreaming) {
throwError(
"Queries without streaming sources cannot be executed with writeStream.start()")(plan)
// throwError(
// "Queries without streaming sources cannot be executed with writeStream.start()")(plan)
}

// Disallow multiple streaming aggregations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STATE_STORE_PROVIDER_CLASS = SQLConfigBuilder("spark.sql.streaming.stateStore.providerClass")
.internal()
.doc("The class used to manage state data in stateful streaming queries. This class must " +
"be a subclass of StateStoreProvider, and must have a zero-arg constructor.")
.stringConf
.createOptional

val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
SQLConfigBuilder("spark.sql.streaming.stateStore.minDeltasForSnapshot")
.internal()
Expand Down Expand Up @@ -670,6 +677,8 @@ class SQLConf extends Serializable with Logging {

def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)

def stateStoreProviderClass: Option[String] = getConf(STATE_STORE_PROVIDER_CLASS)

def stateStoreMinDeltasForSnapshot: Int = getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)

def checkpointLocation: Option[String] = getConf(CHECKPOINT_LOCATION)
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2690,8 +2690,8 @@ class Dataset[T] private[sql](
@InterfaceStability.Evolving
def writeStream: DataStreamWriter[T] = {
if (!isStreaming) {
logicalPlan.failAnalysis(
"'writeStream' can be called only on streaming Dataset/DataFrame")
// logicalPlan.failAnalysis(
// "'writeStream' can be called only on streaming Dataset/DataFrame")
}
new DataStreamWriter[T](this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
object StreamingRelationStrategy extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case s: StreamingRelation =>
println(s.isStreaming)
StreamingRelationExec(s.sourceName, s.output) :: Nil
case s: StreamingExecutionRelation =>
StreamingRelationExec(s.toString, s.output) :: Nil
case _ => Nil
// case _ => Nil
case p => println("StreamingRelationStrategy " + p); Nil
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ case class StateStoreRestoreExec(
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeName = "default",
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
Expand Down Expand Up @@ -141,9 +143,11 @@ case class StateStoreSaveExec(
child.execute().mapPartitionsWithStateStore(
getStateId.checkpointLocation,
operatorId = getStateId.operatorId,
storeName = "default",
storeVersion = getStateId.batchId,
keyExpressions.toStructType,
child.output.toStructType,
indexOrdinal = None,
sqlContext.sessionState,
Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,7 @@ import org.apache.spark.util.Utils
* to ensure re-executed RDD operations re-apply updates on the correct past version of the
* store.
*/
private[state] class HDFSBackedStateStoreProvider(
val id: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
storeConf: StateStoreConf,
hadoopConf: Configuration
) extends StateStoreProvider with Logging {
private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging {

type MapType = java.util.HashMap[UnsafeRow, UnsafeRow]

Expand Down Expand Up @@ -224,6 +218,22 @@ private[state] class HDFSBackedStateStoreProvider(
store
}

override def init(stateStoreId: StateStoreId,
keySchema: StructType,
valueSchema: StructType,
indexOrdinal: Option[Int], // for sorting the data
storeConf: StateStoreConf,
hadoopConf: Configuration): Unit = {
this.stateStoreId = stateStoreId
this.keySchema = keySchema
this.valueSchema = valueSchema
this.storeConf = storeConf
this.hadoopConf = hadoopConf
fs.mkdirs(baseDir)
}

override def id: StateStoreId = stateStoreId

/** Do maintenance backing data files, including creating snapshots and cleaning up old files */
override def doMaintenance(): Unit = {
try {
Expand All @@ -239,16 +249,19 @@ private[state] class HDFSBackedStateStoreProvider(
s"HDFSStateStoreProvider[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
}

/* Internal classes and methods */
/* Internal fields and methods */

private val loadedMaps = new mutable.HashMap[Long, MapType]
private val baseDir =
new Path(id.checkpointLocation, s"${id.operatorId}/${id.partitionId.toString}")
private val fs = baseDir.getFileSystem(hadoopConf)
private val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)

initialize()
@volatile private var stateStoreId: StateStoreId = _
@volatile private var keySchema: StructType = _
@volatile private var valueSchema: StructType = _
@volatile private var storeConf: StateStoreConf = _
@volatile private var hadoopConf: Configuration = _

private lazy val loadedMaps = new mutable.HashMap[Long, MapType]
private lazy val baseDir =
new Path(id.checkpointLocation, s"${id.operatorId}/${id.partitionId.toString}")
private lazy val fs = baseDir.getFileSystem(hadoopConf)
private lazy val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
private case class StoreFile(version: Long, path: Path, isSnapshot: Boolean)

/** Commit a set of updates to the store with the given new version */
Expand Down
Loading