Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't write Dataset[SensorData] to Kafka without transformation #6

Open
konobey opened this issue Sep 11, 2019 · 5 comments
Open

Can't write Dataset[SensorData] to Kafka without transformation #6

konobey opened this issue Sep 11, 2019 · 5 comments

Comments

@konobey
Copy link

konobey commented Sep 11, 2019

Hello!

The code in notebook kafka-sensor-data-generator.snb.ipynb:

val query = sensorValues.writeStream.format("kafka")
  .queryName("kafkaWriter")
  .outputMode("append")
  .option("kafka.bootstrap.servers", kafkaBootstrapServer) // comma-separated list of host:port
  .option("topic", targetTopic)
  .option("checkpointLocation", workDir+"/generator-checkpoint")
  .option("failOnDataLoss", "false") // use this option when testing
  .start()

doesn't work because sensorValues is of type Dataset[SensorData], but there should be value attribute of type String concatenating all the attributes from Dataset[SensorData] row.

@Kiollpt
Copy link

Kiollpt commented May 26, 2020

@konobey
Did you figure out for it?

@maasg
Copy link
Contributor

maasg commented May 31, 2020

@Kiollpt I'm sorry for this error in the code.
Kafka only accepts key-value datasets. The data must be converted into the right shape before writing to Kafka.
This can be done by, for example, converting the records to JSON format.

This should do the trick:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
val kvStream = sensorValues.select($"sensorId".cast(StringType) as "key", to_json(struct("*")) as "value")

Then, the kvStream can be written to Kafka:

val query = kvStream.writeStream.format("kafka")
  .queryName("kafkaWriter")
  .outputMode("append")
  .option("kafka.bootstrap.servers", kafkaBootstrapServer) // comma-separated list of host:port
  .option("topic", targetTopic)
  .option("checkpointLocation", workDir+"/generator-checkpoint")
  .option("failOnDataLoss", "false") // use this option when testing
  .start()

@Kiollpt
Copy link

Kiollpt commented Jun 2, 2020

@maasg Thank you for the method

@Kiollpt
Copy link

Kiollpt commented Jun 2, 2020

About the action in Ch9 it would be like this

val KafkaSchema = Encoders.product[SensorData].schema
val iotData = rawData
   .select(from_json($"value".cast("string"),KafkaSchema) as "record")
   .select("record.*").as[SensorData]

@konobey
Copy link
Author

konobey commented Jun 4, 2020

@maasg Thanks! Could you fix the code in notebook, please?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants