Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't use case class in the Scala notebook #40

Open
leeivan opened this issue Jul 30, 2017 · 4 comments
Open

Can't use case class in the Scala notebook #40

leeivan opened this issue Jul 30, 2017 · 4 comments

Comments

@leeivan
Copy link

leeivan commented Jul 30, 2017

the version of docker:
jupyter/all-spark-notebook:lastest

the way to start docker:
docker run -it --rm -p 8888:8888 jupyter/all-spark-notebook:latest
or
docker ps -a
docker start -i containerID

the steps:

Visit http://localhost:8888
Start an spylon-kernal notebook
input code above

import spark.implicits._
val p = spark.sparkContext.textFile ("../Data/person.txt")
val pmap = p.map ( _.split (","))
pmap.collect()

the output:res0: Array[Array[String]] = Array(Array(Barack, Obama, 53), Array(George, Bush, 68), Array(Bill, Clinton, 68))

case class Persons (first_name:String,last_name: String,age:Int)
val personRDD = pmap.map ( p => Persons (p(0), p(1), p(2).toInt))
personRDD.take(1)

the error message:

org.apache.spark.SparkDriverExecutionException: Execution error
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1186)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
  ... 39 elided
Caused by: java.lang.ArrayStoreException: [LPersons;
  at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:90)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:2043)
  at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:2043)
  at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:59)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1182)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1711)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

The above code is working with the spark-shell. From error message, I speculated that the driver program didn't correctly handle case class Persons to RDD partition.

@parente
Copy link
Contributor

parente commented Jul 30, 2017

I have no insight into what's wrong after poking and doing a bit of research. There have been similar reports of issues like this in the past, with slight variance in the details:

Interestingly enough, the case class works fine when using DataFrames and Datasets:

import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val first_name = StructField("first_name", StringType)
val last_name = StructField("last_name", StringType)
val age = StructField("age", IntegerType)
val schema = StructType(Array(first_name, last_name, age))

val df = spark.read.schema(schema).csv("person.txt")

val ds = df.as[Persons]

ds.collect()

@leeivan
Copy link
Author

leeivan commented Aug 1, 2017

I used your example to test my spylon kernel env, it didn't work also. When I executed the code:
ds2.collect
the error message is:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.ClassCastException: DataRow cannot be cast to DataRow
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2390)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2390)
  ... 37 elided
Caused by: java.lang.ClassCastException: DataRow cannot be cast to DataRow
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:108)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  ... 1 more

Do you meet such problem? I didn't figure out why the problem is produced, environment or others.
My the docker image:
docker push leeivan/spark-lab-spylon

@walmaaoui
Copy link

hello guys,
Any updates on the issue? this blocks the use of delta-lake (a table format from databricks) with spylon.
I am getting:

Caused by: java.lang.ArrayStoreException: org.apache.spark.sql.delta.actions.AddFile
  at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:75)
  at scala.Array$.slowcopy(Array.scala:152)
  at scala.Array$.copy(Array.scala:178)
  at scala.collection.mutable.ResizableArray.copyToArray(ResizableArray.scala:80)
  at scala.collection.mutable.ResizableArray.copyToArray$(ResizableArray.scala:78)
  at scala.collection.mutable.ArrayBuffer.copyToArray(ArrayBuffer.scala:49)
  at scala.collection.TraversableOnce.copyToArray(TraversableOnce.scala:316)
  at scala.collection.TraversableOnce.copyToArray$(TraversableOnce.scala:315)
  at scala.collection.AbstractTraversable.copyToArray(Traversable.scala:108)
  at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:324)
  at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:321)
  at scala.collection.AbstractTraversable.toArray(Traversable.scala:108)
  at org.apache.spark.sql.delta.files.DelayedCommitProtocol.commitJob(DelayedCommitProtocol.scala:59)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:215)

@zhangfiona
Copy link

zhangfiona commented Apr 29, 2022

I used Apache Toree-Scala kernel in jupyter lab with case class, and had the same problem. I found the jira: https://issues.apache.org/jira/browse/TOREE-428, but the bug also exists in the latest version. Any update now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants