File tree Expand file tree Collapse file tree 3 files changed +7
-4
lines changed
core/raydp-main/src/main/scala/org/apache/spark/sql/raydp Expand file tree Collapse file tree 3 files changed +7
-4
lines changed Original file line number Diff line number Diff line change 7474 run : |
7575 python -m pip install --upgrade pip
7676 pip install wheel
77- pip install "numpy<1.24"
77+ pip install "numpy<1.24" "click<8.3.0"
7878 SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
7979 if [ "$(uname -s)" == "Linux" ]
8080 then
Original file line number Diff line number Diff line change 7575 python -m pip install --upgrade pip
7676 pip install wheel
7777 pip install "numpy<1.24"
78- pip install "pydantic<2.0"
78+ pip install "pydantic<2.0" "click<8.3.0"
7979 SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
8080 if [ "$(uname -s)" == "Linux" ]
8181 then
Original file line number Diff line number Diff line change @@ -93,8 +93,8 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
9393 batchSize = 0
9494 }
9595 val schema = df.schema
96- val arrowSchema = SparkShimLoader .getSparkShims.toArrowSchema(
97- schema, timeZoneId, sparkSession)
96+ val arrowSchemaJson = SparkShimLoader .getSparkShims.toArrowSchema(
97+ schema, timeZoneId, sparkSession).toJson
9898
9999 val objectIds = df.queryExecution.toRdd.mapPartitions{ iter =>
100100 val queue = ObjectRefHolder .getQueue(uuid)
@@ -105,6 +105,9 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
105105 } else {
106106 Iterator (iter)
107107 }
108+
109+ // Reconstruct arrow schema from JSON on executor
110+ val arrowSchema = Schema .fromJSON(arrowSchemaJson)
108111 val allocator = ArrowUtils .rootAllocator.newChildAllocator(
109112 s " ray object store writer " , 0 , Long .MaxValue )
110113 val root = VectorSchemaRoot .create(arrowSchema, allocator)
You can’t perform that action at this time.
0 commit comments