@@ -67,9 +67,9 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
6767
6868 class MemoryStreamCapture [A : Encoder ](
6969 id : Int ,
70- sqlContext : SQLContext ,
70+ sparkSession : SparkSession ,
7171 numPartitions : Option [Int ] = None )
72- extends MemoryStream [A ](id, sqlContext , numPartitions = numPartitions) {
72+ extends MemoryStream [A ](id, sparkSession : SparkSession , numPartitions = numPartitions) {
7373
7474 val commits = new ListBuffer [streaming.Offset ]()
7575 val commitThreads = new ListBuffer [Thread ]()
@@ -136,7 +136,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
136136 test(" async WAL commits recovery" ) {
137137 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
138138
139- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
139+ val inputData = new MemoryStream [Int ](id = 0 , spark )
140140 val ds = inputData.toDF()
141141
142142 var index = 0
@@ -204,7 +204,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
204204 }
205205
206206 test(" async WAL commits turn on and off" ) {
207- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
207+ val inputData = new MemoryStream [Int ](id = 0 , spark )
208208 val ds = inputData.toDS()
209209
210210 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
@@ -308,7 +308,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
308308 }
309309
310310 test(" Fail with once trigger" ) {
311- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
311+ val inputData = new MemoryStream [Int ](id = 0 , spark )
312312 val ds = inputData.toDF()
313313
314314 val e = intercept[IllegalArgumentException ] {
@@ -323,7 +323,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
323323
324324 test(" Fail with available now trigger" ) {
325325
326- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
326+ val inputData = new MemoryStream [Int ](id = 0 , spark )
327327 val ds = inputData.toDF()
328328
329329 val e = intercept[IllegalArgumentException ] {
@@ -339,7 +339,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
339339 test(" switching between async wal commit enabled and trigger once" ) {
340340 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
341341
342- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
342+ val inputData = new MemoryStream [Int ](id = 0 , spark )
343343 val ds = inputData.toDF()
344344
345345 var index = 0
@@ -500,7 +500,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
500500 test(" switching between async wal commit enabled and available now" ) {
501501 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
502502
503- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
503+ val inputData = new MemoryStream [Int ](id = 0 , spark )
504504 val ds = inputData.toDF()
505505
506506 var index = 0
@@ -669,7 +669,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
669669 }
670670
671671 def testAsyncWriteErrorsAlreadyExists (path : String ): Unit = {
672- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
672+ val inputData = new MemoryStream [Int ](id = 0 , spark )
673673 val ds = inputData.toDS()
674674 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
675675
@@ -720,7 +720,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
720720 }
721721
722722 def testAsyncWriteErrorsPermissionsIssue (path : String ): Unit = {
723- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
723+ val inputData = new MemoryStream [Int ](id = 0 , spark )
724724 val ds = inputData.toDS()
725725 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
726726 val commitDir = new File (checkpointLocation + path)
@@ -778,7 +778,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
778778
779779 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
780780
781- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
781+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
782782
783783 val ds = inputData.toDF()
784784
@@ -852,7 +852,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
852852 }
853853
854854 test(" interval commits and recovery" ) {
855- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
855+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
856856 val ds = inputData.toDS()
857857
858858 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
@@ -934,7 +934,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
934934 }
935935
936936 test(" recovery when first offset is not zero and not commit log entries" ) {
937- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
937+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
938938 val ds = inputData.toDS()
939939
940940 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
@@ -961,7 +961,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
961961 /**
962962 * start new stream
963963 */
964- val inputData2 = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
964+ val inputData2 = new MemoryStreamCapture [Int ](id = 0 , spark )
965965 val ds2 = inputData2.toDS()
966966 testStream(ds2, extraOptions = Map (
967967 ASYNC_PROGRESS_TRACKING_ENABLED -> " true" ,
@@ -995,7 +995,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
995995 }
996996
997997 test(" recovery non-contiguous log" ) {
998- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
998+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
999999 val ds = inputData.toDS()
10001000
10011001 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
@@ -1088,7 +1088,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
10881088 }
10891089
10901090 test(" Fail on pipelines using unsupported sinks" ) {
1091- val inputData = new MemoryStream [Int ](id = 0 , sqlContext = sqlContext )
1091+ val inputData = new MemoryStream [Int ](id = 0 , spark )
10921092 val ds = inputData.toDF()
10931093
10941094 val e = intercept[IllegalArgumentException ] {
@@ -1109,7 +1109,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
11091109
11101110 withSQLConf(SQLConf .MIN_BATCHES_TO_RETAIN .key -> " 2" , SQLConf .ASYNC_LOG_PURGE .key -> " false" ) {
11111111 withTempDir { checkpointLocation =>
1112- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
1112+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
11131113 val ds = inputData.toDS()
11141114
11151115 val clock = new StreamManualClock
@@ -1243,7 +1243,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
12431243 test(" with async log purging" ) {
12441244 withSQLConf(SQLConf .MIN_BATCHES_TO_RETAIN .key -> " 2" , SQLConf .ASYNC_LOG_PURGE .key -> " true" ) {
12451245 withTempDir { checkpointLocation =>
1246- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
1246+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
12471247 val ds = inputData.toDS()
12481248
12491249 val clock = new StreamManualClock
@@ -1381,7 +1381,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
13811381 }
13821382
13831383 test(" test multiple gaps in offset and commit logs" ) {
1384- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
1384+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
13851385 val ds = inputData.toDS()
13861386
13871387 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
@@ -1427,7 +1427,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
14271427 /**
14281428 * start new stream
14291429 */
1430- val inputData2 = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
1430+ val inputData2 = new MemoryStreamCapture [Int ](id = 0 , spark )
14311431 val ds2 = inputData2.toDS()
14321432 testStream(ds2, extraOptions = Map (
14331433 ASYNC_PROGRESS_TRACKING_ENABLED -> " true" ,
@@ -1460,7 +1460,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
14601460 }
14611461
14621462 test(" recovery when gaps exist in offset and commit log" ) {
1463- val inputData = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
1463+ val inputData = new MemoryStreamCapture [Int ](id = 0 , spark )
14641464 val ds = inputData.toDS()
14651465
14661466 val checkpointLocation = Utils .createTempDir(namePrefix = " streaming.metadata" ).getCanonicalPath
@@ -1494,7 +1494,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
14941494 /**
14951495 * start new stream
14961496 */
1497- val inputData2 = new MemoryStreamCapture [Int ](id = 0 , sqlContext = sqlContext )
1497+ val inputData2 = new MemoryStreamCapture [Int ](id = 0 , spark )
14981498 val ds2 = inputData2.toDS()
14991499 testStream(ds2, extraOptions = Map (
15001500 ASYNC_PROGRESS_TRACKING_ENABLED -> " true" ,
0 commit comments