From d2b457579aa80952826b4479f700725ad35985ae Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Fri, 26 Sep 2025 16:26:52 -0700 Subject: [PATCH 1/8] Add Python worker log definitions for BlockManager. --- .../org/apache/spark/storage/BlockId.scala | 29 +++++++++-- .../spark/storage/LogBlockIdGenerator.scala | 11 ++++ .../org/apache/spark/storage/LogLine.scala | 5 ++ .../spark/storage/BlockManagerSuite.scala | 51 +++++++++++++++++++ 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index a15426783ebec..b9dd0930efc23 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -179,6 +179,7 @@ case class PythonStreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { object LogBlockType extends Enumeration { type LogBlockType = Value val TEST = Value + val PYTHON_WORKER = Value } /** @@ -188,9 +189,9 @@ object LogBlockType extends Enumeration { * and log management. * @param executorId the ID of the executor that produced this log block. */ -abstract sealed class LogBlockId( - val lastLogTime: Long, - val executorId: String) extends BlockId { +abstract sealed class LogBlockId extends BlockId { + def lastLogTime: Long + def executorId: String def logBlockType: LogBlockType } @@ -198,20 +199,35 @@ object LogBlockId { def empty(logBlockType: LogBlockType): LogBlockId = { logBlockType match { case LogBlockType.TEST => TestLogBlockId(0L, "") + case LogBlockType.PYTHON_WORKER => PythonWorkerLogBlockId(0L, "", "", "") case _ => throw new SparkException(s"Unsupported log block type: $logBlockType") } } } // Used for test purpose only. -case class TestLogBlockId(override val lastLogTime: Long, override val executorId: String) - extends LogBlockId(lastLogTime, executorId) { +case class TestLogBlockId(lastLogTime: Long, executorId: String) + extends LogBlockId { override def name: String = "test_log_" + lastLogTime + "_" + executorId override def logBlockType: LogBlockType = LogBlockType.TEST } +@DeveloperApi +case class PythonWorkerLogBlockId( + lastLogTime: Long, + executorId: String, + sessionId: String, + workerId: String) + extends LogBlockId { + override def name: String = { + s"python_worker_log_${lastLogTime}_${executorId}_${sessionId}_$workerId" + } + + override def logBlockType: LogBlockType = LogBlockType.PYTHON_WORKER +} + /** Id associated with temporary local data managed as blocks. Not serializable. */ private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { override def name: String = "temp_local_" + id @@ -260,6 +276,7 @@ object BlockId { val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r val TEST = "test_(.*)".r val TEST_LOG_BLOCK = "test_log_([0-9]+)_(.*)".r + val PYTHON_WORKER_LOG_BLOCK = "python_worker_log_([0-9]+)_([^_]*)_([^_]*)_([^_]*)".r def apply(name: String): BlockId = name match { case RDD(rddId, splitIndex) => @@ -302,6 +319,8 @@ object BlockId { TempShuffleBlockId(UUID.fromString(uuid)) case TEST_LOG_BLOCK(lastLogTime, executorId) => TestLogBlockId(lastLogTime.toLong, executorId) + case PYTHON_WORKER_LOG_BLOCK(lastLogTime, executorId, sessionId, workerId) => + PythonWorkerLogBlockId(lastLogTime.toLong, executorId, sessionId, workerId) case TEST(value) => TestBlockId(value) case _ => throw SparkCoreErrors.unrecognizedBlockIdError(name) diff --git a/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala index 4a2b90677ba3e..fced080970c21 100644 --- a/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala +++ b/core/src/main/scala/org/apache/spark/storage/LogBlockIdGenerator.scala @@ -47,3 +47,14 @@ trait LogBlockIdGenerator { blockId } } + +class PythonWorkerLogBlockIdGenerator( + sessionId: String, + workerId: String) + extends LogBlockIdGenerator { + + override def logBlockType: LogBlockType = LogBlockType.PYTHON_WORKER + + override protected def genUniqueBlockId(lastLogTime: Long, executorId: String): LogBlockId = + PythonWorkerLogBlockId(lastLogTime, executorId, sessionId, workerId) +} diff --git a/core/src/main/scala/org/apache/spark/storage/LogLine.scala b/core/src/main/scala/org/apache/spark/storage/LogLine.scala index dc646f289e37f..ba5a452083934 100644 --- a/core/src/main/scala/org/apache/spark/storage/LogLine.scala +++ b/core/src/main/scala/org/apache/spark/storage/LogLine.scala @@ -38,6 +38,8 @@ object LogLine { logBlockType match { case LogBlockType.TEST => classTag[TestLogLine] + case LogBlockType.PYTHON_WORKER => + classTag[PythonWorkerLogLine] case unsupportedLogBlockType => throw new RuntimeException("Not supported log type " + unsupportedLogBlockType) } @@ -46,3 +48,6 @@ object LogLine { case class TestLogLine(eventTime: Long, sequenceId: Long, message: String) extends LogLine { } + +case class PythonWorkerLogLine(eventTime: Long, sequenceId: Long, message: String) + extends LogLine diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 95a315a486ff0..94ba6cb4ac0fd 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.{File, InputStream, IOException} import java.nio.ByteBuffer import java.nio.file.Files +import java.util.UUID import java.util.concurrent.ThreadLocalRandom import scala.collection.mutable @@ -2583,6 +2584,56 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(logBlockIds.contains(logBlockId1) && logBlockIds.contains(logBlockId2)) } + test("PythonWorkerLog block write/read") { + val store = makeBlockManager(8000, "executor1") + val logBlockWriter = store.getLogBlockWriter(LogBlockType.PYTHON_WORKER) + val logBlockId = PythonWorkerLogBlockId( + 1L, store.executorId, UUID.randomUUID.toString, "1234") + + assert(BlockId(logBlockId.name) == logBlockId) + + val log1 = PythonWorkerLogLine(0L, 1L, "json1") + val log2 = PythonWorkerLogLine(1L, 2L, "json2") + + logBlockWriter.writeLog(log1) + logBlockWriter.writeLog(log2) + logBlockWriter.save(logBlockId) + + val status = store.getStatus(logBlockId) + assert(status.isDefined) + + assert(store.getMatchingBlockIds(b => logBlockId.equals(b)).nonEmpty) + + val data = store.get[PythonWorkerLogLine](logBlockId).get.data.toSeq + assert(data === Seq(log1, log2)) + } + + test("rolling python worker log block write/read") { + val store = makeBlockManager(8000, "executor1") + + val sessionId = UUID.randomUUID.toString + val workerId = "1234" + val logBlockIdGenerator = new PythonWorkerLogBlockIdGenerator(sessionId, workerId) + + val logBlockWriter = store.getRollingLogWriter(logBlockIdGenerator, 100) + val log1 = PythonWorkerLogLine(0L, 1L, "json 1") + val log2 = PythonWorkerLogLine(1L, 2L, "json 2") + val log3 = PythonWorkerLogLine(2L, 3L, "json 3") + val log4 = PythonWorkerLogLine(3L, 4L, "json 4") + logBlockWriter.writeLog(log1) + logBlockWriter.writeLog(log2) + logBlockWriter.flush() + logBlockWriter.writeLog(log3) + logBlockWriter.writeLog(log4) + logBlockWriter.close() + + val logBlockId1 = PythonWorkerLogBlockId(2L, store.executorId, sessionId, workerId) + val logBlockId2 = PythonWorkerLogBlockId(3L, store.executorId, sessionId, workerId) + val logBlockIds = store.getMatchingBlockIds(_.isInstanceOf[PythonWorkerLogBlockId]).distinct + assert(logBlockIds.size === 2) + assert(logBlockIds.contains(logBlockId1) && logBlockIds.contains(logBlockId2)) + } + private def createKryoSerializerWithDiskCorruptedInputStream(): KryoSerializer = { class TestDiskCorruptedInputStream extends InputStream { override def read(): Int = throw new IOException("Input/output error") From 26826fd3b296c20cf77b8d6135767708e0179e0a Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Fri, 26 Sep 2025 18:37:19 -0700 Subject: [PATCH 2/8] Introduce python_worker_logs system session view. --- .../analysis/RelationResolution.scala | 20 ++++- .../logical/pythonLogicalOperators.scala | 46 +++++++++++- .../spark/sql/execution/SparkStrategies.scala | 2 + .../python/PythonWorkerLogsExec.scala | 63 ++++++++++++++++ .../python/PythonWorkerLogsSuite.scala | 73 +++++++++++++++++++ 5 files changed, 201 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsExec.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala index 08be456f090e2..e4ee461ef8b9b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.{ TemporaryViewRelation, UnresolvedCatalogRelation } -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, PythonWorkerLogs, SubqueryAlias} import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.catalog.{ CatalogManager, @@ -105,6 +105,8 @@ class RelationResolution(override val catalogManager: CatalogManager) u.isStreaming, finalTimeTravelSpec.isDefined ).orElse { + resolveSystemSessionView(u.multipartIdentifier) + }.orElse { expandIdentifier(u.multipartIdentifier) match { case CatalogAndIdentifier(catalog, ident) => val key = @@ -242,4 +244,20 @@ class RelationResolution(override val catalogManager: CatalogManager) } } } + + private def isSystemSessionIdentifier(identifier: Seq[String]): Boolean = { + identifier.length > 2 && + identifier(0).equalsIgnoreCase(CatalogManager.SYSTEM_CATALOG_NAME) && + identifier(1).equalsIgnoreCase(CatalogManager.SESSION_NAMESPACE) + } + + private def resolveSystemSessionView( + identifier: Seq[String]): Option[LogicalPlan] = { + if (isSystemSessionIdentifier(identifier)) { + Option(identifier.drop(2)).collect { + case Seq(viewName) if viewName.equalsIgnoreCase(PythonWorkerLogs.ViewName) => + PythonWorkerLogs.viewDefinition() + } + } else None + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 0c0ea24434892..802678bc2c6ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -18,11 +18,15 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.resource.ResourceProfile -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression, PythonUDF, PythonUDTF} +import org.apache.spark.sql.catalyst.SQLConfHelper +import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedAttribute, UnresolvedStar} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Expression, JsonToStructs, PythonUDF, PythonUDTF} import org.apache.spark.sql.catalyst.trees.TreePattern._ +import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, TimeMode} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.util.LogUtils /** * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame. @@ -381,3 +385,41 @@ case class AttachDistributedSequence( s"$nodeName$truncatedOutputString $indexColumn" } } + +case class PythonWorkerLogs(jsonAttr: Attribute) + extends LeafNode with MultiInstanceRelation with SQLConfHelper { + + override def output: Seq[Attribute] = Seq(jsonAttr) + + override def newInstance(): PythonWorkerLogs = + copy(jsonAttr = jsonAttr.newInstance()) + + override protected def stringArgs: Iterator[Any] = Iterator(output) + + override def computeStats(): Statistics = Statistics( + // TODO: Instead of returning a default value here, find a way to return a meaningful size + // estimate for RDDs. See PR 1238 for more discussions. + sizeInBytes = BigInt(conf.defaultSizeInBytes) + ) +} + +object PythonWorkerLogs { + val ViewName = "python_worker_logs" + + def apply(): LogicalPlan = { + PythonWorkerLogs(DataTypeUtils.toAttribute(StructField("message", StringType))) + } + + def viewDefinition(): LogicalPlan = { + Project( + Seq(UnresolvedStar(Some(Seq("from_json")))), + Project( + Seq(Alias( + JsonToStructs( + schema = StructType.fromDDL(LogUtils.SPARK_LOG_SCHEMA), + options = Map.empty, + child = UnresolvedAttribute("message")), + "from_json")()), + PythonWorkerLogs())) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index f76bc911bef8f..96f97a6d45996 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -962,6 +962,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil case logical.AttachDistributedSequence(attr, child) => execution.python.AttachDistributedSequenceExec(attr, planLater(child)) :: Nil + case logical.PythonWorkerLogs(jsonAttr) => + execution.python.PythonWorkerLogsExec(jsonAttr) :: Nil case logical.MapElements(f, _, _, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, _, _, in, out, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsExec.scala new file mode 100644 index 0000000000000..4dc23b841cc52 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsExec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import org.apache.spark.SparkEnv +import org.apache.spark.rdd.{BlockRDD, RDD} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.encoderFor +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.storage.{BlockId, PythonWorkerLogBlockId, PythonWorkerLogLine} + +case class PythonWorkerLogsExec(jsonAttr: Attribute) + extends LeafExecNode { + + override def output: Seq[Attribute] = Seq(jsonAttr) + + override lazy val metrics: Map[String, SQLMetric] = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + + override protected def doExecute(): RDD[InternalRow] = { + import session.implicits._ + + val blockIds = getBlockIds(session.sessionUUID) + val rdd = new BlockRDD[PythonWorkerLogLine](session.sparkContext, blockIds.toArray) + + val encoder = encoderFor[String] + val toRow = encoder.createSerializer() + + val numOutputRows = longMetric("numOutputRows") + rdd.mapPartitionsInternal { iter => + iter.map { value => + numOutputRows += 1 + toRow(value.message).copy() + } + } + } + + private def getBlockIds(sessionId: String): Seq[BlockId] = { + val blockManager = SparkEnv.get.blockManager.master + blockManager.getMatchingBlockIds( + id => id.isInstanceOf[PythonWorkerLogBlockId] && + id.asInstanceOf[PythonWorkerLogBlockId].sessionId == sessionId, + askStorageEndpoints = true + ).distinct + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsSuite.scala new file mode 100644 index 0000000000000..2f363d3e89540 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonWorkerLogsSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.util.UUID + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType +import org.apache.spark.storage.{PythonWorkerLogBlockId, PythonWorkerLogBlockIdGenerator, PythonWorkerLogLine} +import org.apache.spark.util.LogUtils + +class PythonWorkerLogsSuite extends QueryTest with SharedSparkSession { + import testImplicits._ + + protected override def afterEach(): Unit = { + try { + val blockManager = spark.sparkContext.env.blockManager + blockManager.getMatchingBlockIds(_.isInstanceOf[PythonWorkerLogBlockId]) + .foreach(blockManager.removeBlock(_)) + } finally { + super.afterEach() + } + } + + test("schema") { + val schema = spark.table("system.session.python_worker_logs").schema + assert(schema == StructType.fromDDL(LogUtils.SPARK_LOG_SCHEMA)) + } + + private def prepareLogs(sessionId: String): Unit = { + val blockManager = spark.sparkContext.env.blockManager + val logBlockWriter = blockManager.getRollingLogWriter( + new PythonWorkerLogBlockIdGenerator(sessionId, "1234")) + logBlockWriter.writeLog( + PythonWorkerLogLine(0L, 1L, """{"level":"INFO","msg":"msg1"}""")) + logBlockWriter.writeLog( + PythonWorkerLogLine(1L, 2L, """{"level":"ERROR","msg":"msg2"}""")) + logBlockWriter.close() + } + + test("read logs") { + prepareLogs(spark.sessionUUID) + + val df = spark.table("system.session.python_worker_logs") + assert(df.count() == 2) + checkAnswer( + df.select($"level", $"msg"), + Seq(Row("INFO", "msg1"), Row("ERROR", "msg2"))) + } + + test("can't read logs for another session") { + prepareLogs(UUID.randomUUID.toString) + + val df = spark.table("system.session.python_worker_logs") + assert(df.count() == 0) + } +} From a2074f5925859d28f493255e91f409fe45cc735d Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Mon, 29 Sep 2025 18:28:50 -0700 Subject: [PATCH 3/8] Add basic logging support. --- .../api/python/PythonWorkerFactory.scala | 7 +- .../api/python/PythonWorkerLogCapture.scala | 193 +++++++++++++++ python/pyspark/logger/logger.py | 6 +- python/pyspark/logger/worker_io.py | 230 ++++++++++++++++++ .../sql/tests/arrow/test_arrow_python_udf.py | 8 + python/pyspark/sql/tests/test_udf.py | 70 ++++++ python/pyspark/testing/connectutils.py | 16 +- python/pyspark/testing/sqlutils.py | 6 + python/pyspark/worker.py | 10 +- .../apache/spark/sql/internal/SQLConf.scala | 11 + .../sql/connect/service/SessionHolder.scala | 2 + .../spark/sql/api/python/PythonSQLUtils.scala | 18 ++ .../spark/sql/classic/SparkSession.scala | 10 + .../python/BatchEvalPythonExec.scala | 11 +- .../execution/python/PythonUDFRunner.scala | 19 +- 15 files changed, 602 insertions(+), 15 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala create mode 100644 python/pyspark/logger/worker_io.py diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index e02f10cc3fe69..6e56281b425ab 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -395,12 +395,16 @@ private[spark] class PythonWorkerFactory( } } + private val workerLogCapture = + envVars.get("SPARK_SESSION_UUID").map(new PythonWorkerLogCapture(_)) + /** * Redirect the given streams to our stderr in separate threads. */ private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream): Unit = { try { - new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start() + new RedirectThread(workerLogCapture.map(_.wrapInputStream(stdout)).getOrElse(stdout), + System.err, "stdout reader for " + pythonExec).start() new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start() } catch { case e: Exception => @@ -460,6 +464,7 @@ private[spark] class PythonWorkerFactory( } def stop(): Unit = { + workerLogCapture.foreach(_.closeAllWriters()) stopDaemon() } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala new file mode 100644 index 0000000000000..71fc00546ef6e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerLogCapture.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.{BufferedReader, InputStream, InputStreamReader} +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicLong + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.storage.{PythonWorkerLogBlockIdGenerator, PythonWorkerLogLine, RollingLogWriter} + +/** + * Manages Python UDF log capture and routing to per-worker log writers. + * + * This class handles the parsing of Python worker output streams and routes + * log messages to appropriate rolling log writers based on worker PIDs. + * Works for both daemon and non-daemon modes. + */ +private[python] class PythonWorkerLogCapture( + sessionId: String, + logMarker: String = "PYTHON_WORKER_LOGGING") extends Logging { + + // Map to track per-worker log writers: workerId(PID) -> (writer, sequenceId) + private val workerLogWriters = new ConcurrentHashMap[String, (RollingLogWriter, AtomicLong)]() + + /** + * Creates an InputStream wrapper that captures Python UDF logs from the given stream. + * + * @param inputStream The input stream to wrap (typically daemon stdout or worker stdout) + * @return A wrapped InputStream that captures and routes log messages + */ + def wrapInputStream(inputStream: InputStream): InputStream = { + new CaptureWorkerLogsInputStream(inputStream) + } + + /** + * Removes and closes the log writer for a specific worker. + * + * @param workerId The worker ID (typically PID as string) + */ + def removeAndCloseWorkerLogWriter(workerId: String): Unit = { + Option(workerLogWriters.remove(workerId)).foreach { case (writer, _) => + try { + writer.close() + } catch { + case e: Exception => + logWarning(s"Failed to close log writer for worker $workerId", e) + } + } + } + + /** + * Closes all active worker log writers. + */ + def closeAllWriters(): Unit = { + workerLogWriters.values().asScala.foreach { case (writer, _) => + try { + writer.close() + } catch { + case e: Exception => + logWarning("Failed to close log writer", e) + } + } + workerLogWriters.clear() + } + + /** + * Gets or creates a log writer for the specified worker. + * + * @param workerId Unique identifier for the worker (typically PID) + * @return Tuple of (RollingLogWriter, AtomicLong sequence counter) + */ + private def getOrCreateLogWriter(workerId: String): (RollingLogWriter, AtomicLong) = { + workerLogWriters.computeIfAbsent(workerId, _ => { + val logWriter = SparkEnv.get.blockManager.getRollingLogWriter( + new PythonWorkerLogBlockIdGenerator(sessionId, workerId) + ) + (logWriter, new AtomicLong()) + }) + } + + /** + * Processes a log line from a Python worker. + * + * @param line The complete line containing the log marker and JSON + * @return The prefix (non-log content) that should be passed through + */ + private def processLogLine(line: String): String = { + val markerIndex = line.indexOf(s"$logMarker:") + if (markerIndex >= 0) { + val prefix = line.substring(0, markerIndex) + val markerAndJson = line.substring(markerIndex) + + // Parse: "PYTHON_UDF_LOGGING:12345:{json}" + val parts = markerAndJson.split(":", 3) + if (parts.length >= 3) { + val workerId = parts(1) // This is the PID from Python worker + val json = parts(2) + + try { + if (json.isEmpty) { + removeAndCloseWorkerLogWriter(workerId) + } else { + val (writer, seqId) = getOrCreateLogWriter(workerId) + writer.writeLog( + PythonWorkerLogLine(System.currentTimeMillis(), seqId.getAndIncrement(), json) + ) + } + } catch { + case e: Exception => + logWarning(s"Failed to write log for worker $workerId", e) + } + } + prefix + } else { + line + System.lineSeparator() + } + } + + /** + * InputStream wrapper that captures and processes Python UDF logs. + */ + private class CaptureWorkerLogsInputStream(in: InputStream) extends InputStream { + + private[this] val reader = new BufferedReader( + new InputStreamReader(in, StandardCharsets.ISO_8859_1)) + private[this] val temp = new Array[Byte](1) + private[this] var buffer = ByteBuffer.allocate(0) + + override def read(): Int = { + val n = read(temp) + if (n <= 0) { + -1 + } else { + // Signed byte to unsigned integer + temp(0) & 0xff + } + } + + override def read(b: Array[Byte], off: Int, len: Int): Int = { + if (buffer.hasRemaining) { + val buf = ByteBuffer.wrap(b, off, len) + val remaining = Math.min(buffer.remaining(), buf.remaining()) + buf.put(buf.position(), buffer, buffer.position(), remaining) + buffer.position(buffer.position() + remaining) + remaining + } else { + val line = reader.readLine() + if (line == null) { + closeAllWriters() + -1 + } else { + val processedContent = if (line.contains(s"$logMarker:")) { + processLogLine(line) + } else { + line + System.lineSeparator() + } + + buffer = ByteBuffer.wrap(processedContent.getBytes(StandardCharsets.ISO_8859_1)) + read(b, off, len) + } + } + } + + override def close(): Unit = { + try { + reader.close() + } finally { + closeAllWriters() + } + } + } +} diff --git a/python/pyspark/logger/logger.py b/python/pyspark/logger/logger.py index 1c8505e2e3e43..2d7416bcfdcf6 100644 --- a/python/pyspark/logger/logger.py +++ b/python/pyspark/logger/logger.py @@ -50,6 +50,10 @@ class JSONFormatter(logging.Formatter): default_msec_format = "%s.%03d" + def __init__(self, ensure_ascii: bool = False): + super().__init__() + self._ensure_ascii = ensure_ascii + def format(self, record: logging.LogRecord) -> str: """ Format the specified record as a JSON string. @@ -89,7 +93,7 @@ def format(self, record: logging.LogRecord) -> str: "msg": str(exc_value), "stacktrace": structured_stacktrace, } - return json.dumps(log_entry, ensure_ascii=False) + return json.dumps(log_entry, ensure_ascii=self._ensure_ascii) class PySparkLogger(logging.Logger): diff --git a/python/pyspark/logger/worker_io.py b/python/pyspark/logger/worker_io.py new file mode 100644 index 0000000000000..a123c2e4e5379 --- /dev/null +++ b/python/pyspark/logger/worker_io.py @@ -0,0 +1,230 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from contextlib import contextmanager +import io +import logging +import os +import sys +import time +from typing import BinaryIO, Generator, Iterable, Iterator, Optional, TextIO, Union +from types import TracebackType + +from pyspark.logger.logger import JSONFormatter + + +class DelegatingTextIOWrapper(TextIO): + """A TextIO that delegates all operations to another TextIO object.""" + + def __init__(self, delegate: TextIO): + self._delegate = delegate + + # Required TextIO properties + @property + def encoding(self) -> str: + return self._delegate.encoding + + @property + def errors(self) -> Optional[str]: + return self._delegate.errors + + @property + def newlines(self) -> Optional[Union[str, tuple[str, ...]]]: + return self._delegate.newlines + + @property + def buffer(self) -> BinaryIO: + return self._delegate.buffer + + @property + def mode(self) -> str: + return self._delegate.mode + + @property + def name(self) -> str: + return self._delegate.name + + @property + def line_buffering(self) -> int: + return self._delegate.line_buffering + + @property + def closed(self) -> bool: + return self._delegate.closed + + # Iterator protocol + def __iter__(self) -> Iterator[str]: + return iter(self._delegate) + + def __next__(self) -> str: + return next(self._delegate) + + # Context manager protocol + def __enter__(self) -> TextIO: + return self._delegate.__enter__() + + def __exit__( + self, + exc_type: Optional[type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + return self._delegate.__exit__(exc_type, exc_val, exc_tb) + + # Core I/O methods + def write(self, s: str) -> int: + return self._delegate.write(s) + + def writelines(self, lines: Iterable[str]) -> None: + return self._delegate.writelines(lines) + + def read(self, size: int = -1) -> str: + return self._delegate.read(size) + + def readline(self, size: int = -1) -> str: + return self._delegate.readline(size) + + def readlines(self, hint: int = -1) -> list[str]: + return self._delegate.readlines(hint) + + # Stream control methods + def close(self) -> None: + return self._delegate.close() + + def flush(self) -> None: + return self._delegate.flush() + + def seek(self, offset: int, whence: int = io.SEEK_SET) -> int: + return self._delegate.seek(offset, whence) + + def tell(self) -> int: + return self._delegate.tell() + + def truncate(self, size: Optional[int] = None) -> int: + return self._delegate.truncate(size) + + # Stream capability methods + def fileno(self) -> int: + return self._delegate.fileno() + + def isatty(self) -> bool: + return self._delegate.isatty() + + def readable(self) -> bool: + return self._delegate.readable() + + def seekable(self) -> bool: + return self._delegate.seekable() + + def writable(self) -> bool: + return self._delegate.writable() + + +class JSONFormatterWithMarker(JSONFormatter): + default_microsec_format = "%s.%06d" + + def __init__(self, marker: str): + super().__init__(ensure_ascii=True) + self._marker = marker + + def format(self, record: logging.LogRecord) -> str: + return f"{self._marker}:{os.getpid()}:{super().format(record)}" + + def formatTime(self, record: logging.LogRecord, datefmt: Optional[str] = None) -> str: + ct = self.converter(record.created) + if datefmt: + s = time.strftime(datefmt, ct) + else: + s = time.strftime(self.default_time_format, ct) + if self.default_microsec_format: + s = self.default_microsec_format % ( + s, + int((record.created - int(record.created)) * 1000000), + ) + elif self.default_msec_format: + s = self.default_msec_format % (s, record.msecs) + return s + + +class JsonOutput(DelegatingTextIOWrapper): + def __init__( + self, + delegate: TextIO, + json_out: TextIO, + logger_name: str, + log_level: int, + marker: str, + ): + super().__init__(delegate) + self._json_out = json_out + self._logger_name = logger_name + self._log_level = log_level + self._formatter = JSONFormatterWithMarker(marker) + + def write(self, s: str) -> int: + if s.strip(): + log_record = logging.LogRecord( + name=self._logger_name, + level=self._log_level, + pathname=None, # type: ignore[arg-type] + lineno=None, # type: ignore[arg-type] + msg=s.strip(), + args=None, + exc_info=None, + func=None, + sinfo=None, + ) + self._json_out.write(f"{self._formatter.format(log_record)}\n") + self._json_out.flush() + return self._delegate.write(s) + + def writelines(self, lines: Iterable[str]) -> None: + # Process each line through our JSON logging logic + for line in lines: + self.write(line) + + def close(self) -> None: + pass + + +@contextmanager +def capture_outputs() -> Generator[None, None, None]: + if "SPARK_SESSION_UUID" in os.environ: + marker: str = "PYTHON_WORKER_LOGGING" + json_out = original_stdout = sys.stdout + delegate = original_stderr = sys.stderr + + handler = logging.StreamHandler(json_out) + handler.setFormatter(JSONFormatterWithMarker(marker)) + logger = logging.getLogger() + try: + sys.stdout = JsonOutput(delegate, json_out, "stdout", logging.INFO, marker) + sys.stderr = JsonOutput(delegate, json_out, "stderr", logging.ERROR, marker) + logger.addHandler(handler) + try: + yield + finally: + # Send an empty line to indicate the end of the outputs. + json_out.write(f"{marker}:{os.getpid()}:\n") + json_out.flush() + finally: + sys.stdout = original_stdout + sys.stderr = original_stderr + logger.removeHandler(handler) + handler.close() + else: + yield diff --git a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py index ba1fd3e4e0a9b..0e70f525fe0f3 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py @@ -60,6 +60,14 @@ def test_register_java_function(self): def test_register_java_udaf(self): super(ArrowPythonUDFTests, self).test_register_java_udaf() + @unittest.skip("TODO: Python worker logging is not supported for Arrow Python UDFs.") + def test_udf_with_logging(self): + super().test_udf_with_logging() + + @unittest.skip("TODO: Python worker logging is not supported for Arrow Python UDFs.") + def test_multiple_udfs_with_logging(self): + super().test_multiple_udfs_with_logging() + def test_complex_input_types(self): row = ( self.spark.range(1) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index b1fb42ad11ece..939a92b8acd89 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -25,6 +25,8 @@ import io import time from contextlib import redirect_stdout +import logging +import sys from pyspark.sql import SparkSession, Column, Row from pyspark.sql.functions import col, udf, assert_true, lit, rand @@ -53,6 +55,7 @@ test_not_compiled_message, ) from pyspark.testing.utils import assertDataFrameEqual, timeout +from pyspark.util import is_remote_only class BaseUDFTestsMixin(object): @@ -1551,6 +1554,73 @@ def check_struct_binary_type(s): expected = self.spark.createDataFrame([Row(type_name=expected_type)]) assertDataFrameEqual(result, expected) + @unittest.skipIf(is_remote_only(), "Requires JVM access") + def test_udf_with_logging(self): + @udf + def my_udf(): + logger = logging.getLogger("test") + print("print to stdout ❤", file=sys.stdout) + print("print to stderr 😀", file=sys.stderr) + try: + 1 / 0 + except Exception: + logger.exception("exception") + return "x" + + # Logging is disabled by default + assertDataFrameEqual( + self.spark.range(1).select(my_udf().alias("result")), [Row(result="x")] + ) + self.assertEqual(self.spark.table("system.session.python_worker_logs").count(), 0) + + with self.sql_conf({"spark.sql.pyspark.worker.logging.enabled": "true"}): + assertDataFrameEqual( + self.spark.range(1).select(my_udf().alias("result")), [Row(result="x")] + ) + + logs = self.spark.table("system.session.python_worker_logs") + + assertDataFrameEqual( + logs.select("level", "msg", "logger"), + [ + Row(level="INFO", msg="print to stdout ❤", logger="stdout"), + Row(level="ERROR", msg="print to stderr 😀", logger="stderr"), + Row(level="ERROR", msg="exception", logger="test"), + ], + ) + + self.assertEqual(logs.where("exception is not null").select("exception").count(), 1) + + @unittest.skipIf(is_remote_only(), "Requires JVM access") + def test_multiple_udfs_with_logging(self): + @udf + def my_udf1(): + logger = logging.getLogger("test1") + logger.warning("test1") + return "x" + + @udf + def my_udf2(): + logger = logging.getLogger("test2") + logger.warning("test2") + return "y" + + with self.sql_conf({"spark.sql.pyspark.worker.logging.enabled": "true"}): + assertDataFrameEqual( + self.spark.range(1).select(my_udf1().alias("result"), my_udf2().alias("result2")), + [Row(result="x", result2="y")], + ) + + logs = self.spark.table("system.session.python_worker_logs") + + assertDataFrameEqual( + logs.select("level", "msg", "logger"), + [ + Row(level="WARNING", msg="test1", logger="test1"), + Row(level="WARNING", msg="test2", logger="test2"), + ], + ) + class UDFTests(BaseUDFTestsMixin, ReusedSQLTestCase): @classmethod diff --git a/python/pyspark/testing/connectutils.py b/python/pyspark/testing/connectutils.py index b409bd12ae379..bfcb886e1c912 100644 --- a/python/pyspark/testing/connectutils.py +++ b/python/pyspark/testing/connectutils.py @@ -176,6 +176,7 @@ def setUpClass(cls): .remote(cls.master()) .getOrCreate() ) + cls._client = cls.spark.client cls._legacy_sc = None if not is_remote_only(): cls._legacy_sc = PySparkSession._instantiatedSession._sc @@ -191,7 +192,16 @@ def tearDownClass(cls): def setUp(self) -> None: # force to clean up the ML cache before each test - self.spark.client._cleanup_ml_cache() + self._client._cleanup_ml_cache() + + def tearDown(self): + try: + if self._legacy_sc is not None and self._client._server_session_id is not None: + self._legacy_sc._jvm.PythonSQLUtils.cleanupPythonWorkerLogs( + self._client._server_session_id, self._legacy_sc._jsc.sc() + ) + finally: + super().tearDown() def test_assert_remote_mode(self): from pyspark.sql import is_remote @@ -232,10 +242,6 @@ def tearDownClass(cls): finally: super(ReusedMixedTestCase, cls).tearDownClass() - def setUp(self) -> None: - # force to clean up the ML cache before each test - self.connect.client._cleanup_ml_cache() - def compare_by_show(self, df1, df2, n: int = 20, truncate: int = 20): from pyspark.sql.classic.dataframe import DataFrame as SDF from pyspark.sql.connect.dataframe import DataFrame as CDF diff --git a/python/pyspark/testing/sqlutils.py b/python/pyspark/testing/sqlutils.py index 645bf1f2ea80f..22f75bb931b10 100644 --- a/python/pyspark/testing/sqlutils.py +++ b/python/pyspark/testing/sqlutils.py @@ -221,3 +221,9 @@ def tearDownClass(cls): super(ReusedSQLTestCase, cls).tearDownClass() cls.spark.stop() shutil.rmtree(cls.tempdir.name, ignore_errors=True) + + def tearDown(self): + try: + self.spark._jsparkSession.cleanupPythonWorkerLogs() + finally: + super().tearDown() diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 8ab32b4312bb7..0119e7c67a1de 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -96,6 +96,7 @@ setup_spark_files, utf8_deserializer, ) +from pyspark.logger.worker_io import capture_outputs try: import memory_profiler # noqa: F401 @@ -3261,10 +3262,11 @@ def process(): if hasattr(out_iter, "close"): out_iter.close() - if profiler: - profiler.profile(process) - else: - process() + with capture_outputs(): + if profiler: + profiler.profile(process) + else: + process() # Reset task context to None. This is a guard code to avoid residual context when worker # reuse. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dd0dbe36d69a7..283ce9782196b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3843,6 +3843,15 @@ object SQLConf { .version("4.1.0") .fallbackConf(Python.PYTHON_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS) + val PYTHON_WORKER_LOGGING_ENABLED = + buildConf("spark.sql.pyspark.worker.logging.enabled") + .doc("When set to true, this configuration enables comprehensive logging within " + + "Python worker processes that execute User-Defined Functions (UDFs), " + + "User-Defined Table Functions (UDTFs), and other Python-based operations in Spark SQL.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val PYSPARK_PLOT_MAX_ROWS = buildConf("spark.sql.pyspark.plotting.max_rows") .doc("The visual limit on plots. If set to 1000 for top-n-based plots (pie, bar, barh), " + @@ -7186,6 +7195,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def pythonUDFWorkerTracebackDumpIntervalSeconds: Long = getConf(PYTHON_UDF_WORKER_TRACEBACK_DUMP_INTERVAL_SECONDS) + def pythonWorkerLoggingEnabled: Boolean = getConf(PYTHON_WORKER_LOGGING_ENABLED) + def pythonUDFArrowConcurrencyLevel: Option[Int] = getConf(PYTHON_UDF_ARROW_CONCURRENCY_LEVEL) def pythonUDFArrowFallbackOnUDT: Boolean = getConf(PYTHON_UDF_ARROW_FALLBACK_ON_UDT) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 064e9cdc2279a..f3128ce508404 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -380,6 +380,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio mlCache.clear() + session.cleanupPythonWorkerLogs() + eventManager.postClosed() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 471e376e1c22d..5607c98bf29e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -22,6 +22,7 @@ import java.nio.channels.{Channels, SocketChannel} import net.razorvine.pickle.{Pickler, Unpickler} +import org.apache.spark.SparkContext import org.apache.spark.api.python.DechunkedInputStream import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.CLASS_LOADER @@ -39,6 +40,7 @@ import org.apache.spark.sql.execution.arrow.ArrowConverters import org.apache.spark.sql.execution.python.EvaluatePython import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.storage.PythonWorkerLogBlockId import org.apache.spark.util.{MutableURLClassLoader, Utils} private[sql] object PythonSQLUtils extends Logging { @@ -190,6 +192,22 @@ private[sql] object PythonSQLUtils extends Logging { @scala.annotation.varargs def internalFn(name: String, inputs: Column*): Column = Column.internalFn(name, inputs: _*) + + def cleanupPythonWorkerLogs(sessionUUID: String, sparkContext: SparkContext): Unit = { + if (!sparkContext.isStopped) { + try { + val blockManager = sparkContext.env.blockManager.master + blockManager.getMatchingBlockIds( + id => id.isInstanceOf[PythonWorkerLogBlockId] && + id.asInstanceOf[PythonWorkerLogBlockId].sessionId == sessionUUID, + askStorageEndpoints = true) + .distinct + .foreach(blockManager.removeBlock) + } catch { + case _ if sparkContext.isStopped => // Ignore when SparkContext is stopped. + } + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala index c6af76564b767..21cfdaf16eda6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql import org.apache.spark.sql.{AnalysisException, Artifact, DataSourceRegistration, Encoder, Encoders, ExperimentalMethods, Row, SparkSessionBuilder, SparkSessionCompanion, SparkSessionExtensions, SparkSessionExtensionsProvider, UDTFRegistration} +import org.apache.spark.sql.api.python.PythonSQLUtils import org.apache.spark.sql.artifact.ArtifactManager import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.{GeneralParameterizedQuery, NameParameterizedQuery, PosParameterizedQuery, UnresolvedRelation} @@ -887,6 +888,15 @@ class SparkSession private( private[sql] lazy val observationManager = new ObservationManager(this) override private[sql] def isUsable: Boolean = !sparkContext.isStopped + + /** + * Cleans up Python worker logs. + * + * It is used by PySpark tests or Spark Connect when the session is closed. + */ + private[sql] def cleanupPythonWorkerLogs(): Unit = { + PythonSQLUtils.cleanupPythonWorkerLogs(sessionUUID, sparkContext) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala index 866719122ec4c..daec9d0652d20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala @@ -38,6 +38,12 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute] extends EvalPythonExec with PythonSQLMetrics { private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + private[this] val sessionUUID = { + Option(session).collect { + case session if session.sessionState.conf.pythonWorkerLoggingEnabled => + session.sessionUUID + } + } override protected def evaluatorFactory: EvalPythonEvaluatorFactory = { val batchSize = conf.getConf(SQLConf.PYTHON_UDF_MAX_RECORDS_PER_BATCH) @@ -49,6 +55,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute] batchSize, pythonMetrics, jobArtifactUUID, + sessionUUID, conf.pythonUDFProfiler, binaryAsBytes) } @@ -64,6 +71,7 @@ class BatchEvalPythonEvaluatorFactory( batchSize: Int, pythonMetrics: Map[String, SQLMetric], jobArtifactUUID: Option[String], + sessionUUID: Option[String], profiler: Option[String], binaryAsBytes: Boolean) extends EvalPythonEvaluatorFactory(childOutput, udfs, output) { @@ -82,7 +90,8 @@ class BatchEvalPythonEvaluatorFactory( // Output iterator for results from Python. val outputIterator = new PythonUDFWithNamedArgumentsRunner( - funcs, PythonEvalType.SQL_BATCHED_UDF, argMetas, pythonMetrics, jobArtifactUUID, profiler) + funcs, PythonEvalType.SQL_BATCHED_UDF, argMetas, pythonMetrics, + jobArtifactUUID, sessionUUID, profiler) .compute(inputIterator, context.partitionId(), context) val unpickle = new Unpickler diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index 8ff7e57d9421e..b8af6f2c420b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.python import java.io._ +import java.util import java.util.concurrent.atomic.AtomicBoolean import org.apache.spark._ @@ -34,10 +35,18 @@ abstract class BasePythonUDFRunner( evalType: Int, argOffsets: Array[Array[Int]], pythonMetrics: Map[String, SQLMetric], - jobArtifactUUID: Option[String]) + jobArtifactUUID: Option[String], + sessionUUID: Option[String] = None) extends BasePythonRunner[Array[Byte], Array[Byte]]( funcs.map(_._1), evalType, argOffsets, jobArtifactUUID, pythonMetrics) { + override val envVars: util.Map[String, String] = { + val envVars = new util.HashMap(funcs.head._1.funcs.head.envVars) + sessionUUID.foreach { uuid => + envVars.put("SPARK_SESSION_UUID", uuid) + } + envVars + } override val pythonExec: String = SQLConf.get.pysparkWorkerPythonExecutable.getOrElse( funcs.head._1.funcs.head.pythonExec) @@ -126,8 +135,10 @@ class PythonUDFRunner( argOffsets: Array[Array[Int]], pythonMetrics: Map[String, SQLMetric], jobArtifactUUID: Option[String], + sessionUUID: Option[String], profiler: Option[String]) - extends BasePythonUDFRunner(funcs, evalType, argOffsets, pythonMetrics, jobArtifactUUID) { + extends BasePythonUDFRunner(funcs, evalType, argOffsets, pythonMetrics, + jobArtifactUUID, sessionUUID) { override protected def writeUDF(dataOut: DataOutputStream): Unit = { PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets, profiler) @@ -140,9 +151,11 @@ class PythonUDFWithNamedArgumentsRunner( argMetas: Array[Array[ArgumentMetadata]], pythonMetrics: Map[String, SQLMetric], jobArtifactUUID: Option[String], + sessionUUID: Option[String], profiler: Option[String]) extends BasePythonUDFRunner( - funcs, evalType, argMetas.map(_.map(_.offset)), pythonMetrics, jobArtifactUUID) { + funcs, evalType, argMetas.map(_.map(_.offset)), pythonMetrics, + jobArtifactUUID, sessionUUID) { override protected def writeUDF(dataOut: DataOutputStream): Unit = { PythonUDFRunner.writeUDFs(dataOut, funcs, argMetas, profiler) From 384a1a0848e51e32e85ee38037e64c872b3f63de Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Fri, 17 Oct 2025 15:24:51 -0700 Subject: [PATCH 4/8] Provide logging contexts. --- python/pyspark/logger/logger.py | 4 +- python/pyspark/logger/worker_io.py | 79 +++++++++++++++++++++++++--- python/pyspark/sql/tests/test_udf.py | 39 +++++++++++--- python/pyspark/worker.py | 4 +- 4 files changed, 107 insertions(+), 19 deletions(-) diff --git a/python/pyspark/logger/logger.py b/python/pyspark/logger/logger.py index 2d7416bcfdcf6..b60561f24c99c 100644 --- a/python/pyspark/logger/logger.py +++ b/python/pyspark/logger/logger.py @@ -73,7 +73,7 @@ def format(self, record: logging.LogRecord) -> str: "level": record.levelname, "logger": record.name, "msg": record.getMessage(), - "context": record.__dict__.get("kwargs", {}), + "context": record.__dict__.get("context", {}), } if record.exc_info: exc_type, exc_value, exc_tb = record.exc_info @@ -295,7 +295,7 @@ def _log( msg=msg, args=args, exc_info=exc_info, - extra={"kwargs": kwargs}, + extra={"context": kwargs}, stack_info=stack_info, stacklevel=stacklevel, ) diff --git a/python/pyspark/logger/worker_io.py b/python/pyspark/logger/worker_io.py index a123c2e4e5379..31587e8e89231 100644 --- a/python/pyspark/logger/worker_io.py +++ b/python/pyspark/logger/worker_io.py @@ -16,13 +16,14 @@ # from contextlib import contextmanager +import inspect import io import logging import os import sys import time -from typing import BinaryIO, Generator, Iterable, Iterator, Optional, TextIO, Union -from types import TracebackType +from typing import BinaryIO, Callable, Generator, Iterable, Iterator, Optional, TextIO, Union +from types import FrameType, TracebackType from pyspark.logger.logger import JSONFormatter @@ -137,11 +138,16 @@ def writable(self) -> bool: class JSONFormatterWithMarker(JSONFormatter): default_microsec_format = "%s.%06d" - def __init__(self, marker: str): + def __init__(self, marker: str, context_provider: Callable[[], dict[str, str]]): super().__init__(ensure_ascii=True) self._marker = marker + self._context_provider = context_provider def format(self, record: logging.LogRecord) -> str: + context = self._context_provider() + if context: + context.update(record.__dict__.get("context", {})) + record.__dict__["context"] = context return f"{self._marker}:{os.getpid()}:{super().format(record)}" def formatTime(self, record: logging.LogRecord, datefmt: Optional[str] = None) -> str: @@ -168,12 +174,13 @@ def __init__( logger_name: str, log_level: int, marker: str, + context_provider: Callable[[], dict[str, str]], ): super().__init__(delegate) self._json_out = json_out self._logger_name = logger_name self._log_level = log_level - self._formatter = JSONFormatterWithMarker(marker) + self._formatter = JSONFormatterWithMarker(marker, context_provider) def write(self, s: str) -> int: if s.strip(): @@ -201,19 +208,75 @@ def close(self) -> None: pass +def context_provider() -> dict[str, str]: + """ + Provides context information for logging, including caller function name. + Finds the function name from the bottom of the stack, ignoring Python builtin + libraries and PySpark modules. Test packages are included. + + Returns: + dict[str, str]: A dictionary containing context information including: + - func_name: Name of the function that initiated the logging + - class_name: Name of the class that initiated the logging if available + """ + + def is_pyspark_module(module_name: str) -> bool: + return module_name.startswith("pyspark.") and ".tests." not in module_name + + bottom: Optional[FrameType] = None + + # Get caller function information using inspect + try: + frame = inspect.currentframe() + is_in_pyspark_module = False + + if frame: + while frame.f_back: + f_back = frame.f_back + module_name = f_back.f_globals.get("__name__", "") + + if is_pyspark_module(module_name): + if not is_in_pyspark_module: + bottom = frame + is_in_pyspark_module = True + else: + is_in_pyspark_module = False + + frame = f_back + except Exception: + # If anything goes wrong with introspection, don't fail the logging + # Just continue without caller information + pass + + context = {} + if bottom: + context["func_name"] = bottom.f_code.co_name + if "self" in bottom.f_locals: + context["class_name"] = bottom.f_locals["self"].__class__.__name__ + elif "cls" in bottom.f_locals: + context["class_name"] = bottom.f_locals["cls"].__name__ + return context + + @contextmanager -def capture_outputs() -> Generator[None, None, None]: +def capture_outputs( + context_provider: Callable[[], dict[str, str]] = context_provider +) -> Generator[None, None, None]: if "SPARK_SESSION_UUID" in os.environ: marker: str = "PYTHON_WORKER_LOGGING" json_out = original_stdout = sys.stdout delegate = original_stderr = sys.stderr handler = logging.StreamHandler(json_out) - handler.setFormatter(JSONFormatterWithMarker(marker)) + handler.setFormatter(JSONFormatterWithMarker(marker, context_provider)) logger = logging.getLogger() try: - sys.stdout = JsonOutput(delegate, json_out, "stdout", logging.INFO, marker) - sys.stderr = JsonOutput(delegate, json_out, "stderr", logging.ERROR, marker) + sys.stdout = JsonOutput( + delegate, json_out, "stdout", logging.INFO, marker, context_provider + ) + sys.stderr = JsonOutput( + delegate, json_out, "stderr", logging.ERROR, marker, context_provider + ) logger.addHandler(handler) try: yield diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 939a92b8acd89..f17e4088b1535 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -1581,11 +1581,26 @@ def my_udf(): logs = self.spark.table("system.session.python_worker_logs") assertDataFrameEqual( - logs.select("level", "msg", "logger"), + logs.select("level", "msg", "context", "logger"), [ - Row(level="INFO", msg="print to stdout ❤", logger="stdout"), - Row(level="ERROR", msg="print to stderr 😀", logger="stderr"), - Row(level="ERROR", msg="exception", logger="test"), + Row( + level="INFO", + msg="print to stdout ❤", + context={"func_name": my_udf.__name__}, + logger="stdout", + ), + Row( + level="ERROR", + msg="print to stderr 😀", + context={"func_name": my_udf.__name__}, + logger="stderr", + ), + Row( + level="ERROR", + msg="exception", + context={"func_name": my_udf.__name__}, + logger="test", + ), ], ) @@ -1614,10 +1629,20 @@ def my_udf2(): logs = self.spark.table("system.session.python_worker_logs") assertDataFrameEqual( - logs.select("level", "msg", "logger"), + logs.select("level", "msg", "context", "logger"), [ - Row(level="WARNING", msg="test1", logger="test1"), - Row(level="WARNING", msg="test2", logger="test2"), + Row( + level="WARNING", + msg="test1", + context={"func_name": my_udf1.__name__}, + logger="test1", + ), + Row( + level="WARNING", + msg="test2", + context={"func_name": my_udf2.__name__}, + logger="test2", + ), ], ) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 0119e7c67a1de..b232b30c5420e 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -2117,7 +2117,7 @@ def evaluate(*args: pd.Series, num_rows=1): if len(args) == 0: for _ in range(num_rows): yield ( - verify_result(pd.DataFrame(check_return_value(func()))), + verify_result(pd.DataFrame(list(check_return_value(func())))), arrow_return_type, return_type, ) @@ -2127,7 +2127,7 @@ def evaluate(*args: pd.Series, num_rows=1): row_tuples = zip(*args) for row in row_tuples: yield ( - verify_result(pd.DataFrame(check_return_value(func(*row)))), + verify_result(pd.DataFrame(list(check_return_value(func(*row))))), arrow_return_type, return_type, ) From 0eed72644332e49de682b31cbcbfe13ff4931756 Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Tue, 21 Oct 2025 16:54:44 -0700 Subject: [PATCH 5/8] Fix. --- python/pyspark/sql/tests/arrow/test_arrow_python_udf.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py index 0e70f525fe0f3..f16dae381d7f4 100644 --- a/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/arrow/test_arrow_python_udf.py @@ -60,11 +60,15 @@ def test_register_java_function(self): def test_register_java_udaf(self): super(ArrowPythonUDFTests, self).test_register_java_udaf() - @unittest.skip("TODO: Python worker logging is not supported for Arrow Python UDFs.") + @unittest.skip( + "TODO(SPARK-53976): Python worker logging is not supported for Arrow Python UDFs." + ) def test_udf_with_logging(self): super().test_udf_with_logging() - @unittest.skip("TODO: Python worker logging is not supported for Arrow Python UDFs.") + @unittest.skip( + "TODO(SPARK-53976): Python worker logging is not supported for Arrow Python UDFs." + ) def test_multiple_udfs_with_logging(self): super().test_multiple_udfs_with_logging() From 399f591a2001afb644b114b269ff95140fb34f13 Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Wed, 22 Oct 2025 15:32:12 -0700 Subject: [PATCH 6/8] Fix. --- .../org/apache/spark/storage/BlockId.scala | 9 +++++++++ python/pyspark/logger/worker_io.py | 17 ++++++++++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index b9dd0930efc23..3e46a53ee082c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -214,6 +214,15 @@ case class TestLogBlockId(lastLogTime: Long, executorId: String) override def logBlockType: LogBlockType = LogBlockType.TEST } +/** + * Identifies a block of Python worker log data. + * + * @param lastLogTime the timestamp of the last log entry in this block, used for filtering + * and log management. + * @param executorId the ID of the executor that produced this log block. + * @param sessionId the session ID to isolate the logs. + * @param workerId the worker ID to distinguish the Python worker process. + */ @DeveloperApi case class PythonWorkerLogBlockId( lastLogTime: Long, diff --git a/python/pyspark/logger/worker_io.py b/python/pyspark/logger/worker_io.py index 31587e8e89231..f8198f519e215 100644 --- a/python/pyspark/logger/worker_io.py +++ b/python/pyspark/logger/worker_io.py @@ -138,9 +138,10 @@ def writable(self) -> bool: class JSONFormatterWithMarker(JSONFormatter): default_microsec_format = "%s.%06d" - def __init__(self, marker: str, context_provider: Callable[[], dict[str, str]]): + def __init__(self, marker: str, worker_id: str, context_provider: Callable[[], dict[str, str]]): super().__init__(ensure_ascii=True) self._marker = marker + self._worker_id = worker_id self._context_provider = context_provider def format(self, record: logging.LogRecord) -> str: @@ -148,7 +149,7 @@ def format(self, record: logging.LogRecord) -> str: if context: context.update(record.__dict__.get("context", {})) record.__dict__["context"] = context - return f"{self._marker}:{os.getpid()}:{super().format(record)}" + return f"{self._marker}:{self._worker_id}:{super().format(record)}" def formatTime(self, record: logging.LogRecord, datefmt: Optional[str] = None) -> str: ct = self.converter(record.created) @@ -174,13 +175,14 @@ def __init__( logger_name: str, log_level: int, marker: str, + worker_id: str, context_provider: Callable[[], dict[str, str]], ): super().__init__(delegate) self._json_out = json_out self._logger_name = logger_name self._log_level = log_level - self._formatter = JSONFormatterWithMarker(marker, context_provider) + self._formatter = JSONFormatterWithMarker(marker, worker_id, context_provider) def write(self, s: str) -> int: if s.strip(): @@ -264,25 +266,26 @@ def capture_outputs( ) -> Generator[None, None, None]: if "SPARK_SESSION_UUID" in os.environ: marker: str = "PYTHON_WORKER_LOGGING" + worker_id: str = str(os.getpid()) json_out = original_stdout = sys.stdout delegate = original_stderr = sys.stderr handler = logging.StreamHandler(json_out) - handler.setFormatter(JSONFormatterWithMarker(marker, context_provider)) + handler.setFormatter(JSONFormatterWithMarker(marker, worker_id, context_provider)) logger = logging.getLogger() try: sys.stdout = JsonOutput( - delegate, json_out, "stdout", logging.INFO, marker, context_provider + delegate, json_out, "stdout", logging.INFO, marker, worker_id, context_provider ) sys.stderr = JsonOutput( - delegate, json_out, "stderr", logging.ERROR, marker, context_provider + delegate, json_out, "stderr", logging.ERROR, marker, worker_id, context_provider ) logger.addHandler(handler) try: yield finally: # Send an empty line to indicate the end of the outputs. - json_out.write(f"{marker}:{os.getpid()}:\n") + json_out.write(f"{marker}:{worker_id}:\n") json_out.flush() finally: sys.stdout = original_stdout From d070c4bd54db2eb316d06de293e70c47da3fb33e Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Thu, 23 Oct 2025 13:27:22 -0700 Subject: [PATCH 7/8] Fix. --- .../scala/org/apache/spark/api/python/PythonWorkerFactory.scala | 2 +- python/pyspark/logger/worker_io.py | 2 +- .../org/apache/spark/sql/execution/python/PythonUDFRunner.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 6e56281b425ab..f3e364a4be5e3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -396,7 +396,7 @@ private[spark] class PythonWorkerFactory( } private val workerLogCapture = - envVars.get("SPARK_SESSION_UUID").map(new PythonWorkerLogCapture(_)) + envVars.get("PYSPARK_SPARK_SESSION_UUID").map(new PythonWorkerLogCapture(_)) /** * Redirect the given streams to our stderr in separate threads. diff --git a/python/pyspark/logger/worker_io.py b/python/pyspark/logger/worker_io.py index f8198f519e215..2e5ced2e84ad3 100644 --- a/python/pyspark/logger/worker_io.py +++ b/python/pyspark/logger/worker_io.py @@ -264,7 +264,7 @@ def is_pyspark_module(module_name: str) -> bool: def capture_outputs( context_provider: Callable[[], dict[str, str]] = context_provider ) -> Generator[None, None, None]: - if "SPARK_SESSION_UUID" in os.environ: + if "PYSPARK_SPARK_SESSION_UUID" in os.environ: marker: str = "PYTHON_WORKER_LOGGING" worker_id: str = str(os.getpid()) json_out = original_stdout = sys.stdout diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index b8af6f2c420b3..ccc4bec257925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -43,7 +43,7 @@ abstract class BasePythonUDFRunner( override val envVars: util.Map[String, String] = { val envVars = new util.HashMap(funcs.head._1.funcs.head.envVars) sessionUUID.foreach { uuid => - envVars.put("SPARK_SESSION_UUID", uuid) + envVars.put("PYSPARK_SPARK_SESSION_UUID", uuid) } envVars } From f0fb846ff6941986878557b9d19bb1e8d605a461 Mon Sep 17 00:00:00 2001 From: Takuya Ueshin Date: Fri, 24 Oct 2025 16:33:15 -0700 Subject: [PATCH 8/8] Fix. --- python/pyspark/sql/tests/test_udf.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 48d7573299c62..d83bfc54a2958 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -1562,6 +1562,7 @@ def my_udf(): logger = logging.getLogger("test") print("print to stdout ❤", file=sys.stdout) print("print to stderr 😀", file=sys.stderr) + logger.warning("custom context", extra={"context": dict(abc=123)}) try: 1 / 0 except Exception: @@ -1596,6 +1597,12 @@ def my_udf(): context={"func_name": my_udf.__name__}, logger="stderr", ), + Row( + level="WARNING", + msg="custom context", + context={"func_name": my_udf.__name__, "abc": "123"}, + logger="test", + ), Row( level="ERROR", msg="exception",