Skip to content

Commit 72b116b

Browse files
committed
#788 Add test suites for SparkCobolProcessor.
1 parent 53eb3e5 commit 72b116b

File tree

4 files changed

+141
-6
lines changed

4 files changed

+141
-6
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor
18+
19+
/**
20+
* A serializable version of RawRecordProcessor for distributed processing in Spark.
21+
*
22+
* Usage patterns:
23+
* - For standalone JVM applications: Use CobolProcessor with RawRecordProcessor
24+
* - For Spark applications: Use SparkCobolProcessor with SerializableRawRecordProcessor
25+
*
26+
* This trait extends Serializable since Spark distributes processing code across the network
27+
* to worker nodes, requiring all components to be serializable.
28+
*/
29+
trait SerializableRawRecordProcessor extends RawRecordProcessor with Serializable
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
* limitations under the License.
1515
*/
1616

17-
package za.co.absa.cobrix.spark.cobol.builder
17+
package za.co.absa.cobrix.spark.cobol
1818

1919
import org.apache.hadoop.fs.Path
2020
import org.apache.spark.rdd.RDD
2121
import org.apache.spark.sql.SparkSession
2222
import org.slf4j.LoggerFactory
23-
import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor}
23+
import za.co.absa.cobrix.cobol.processor.{CobolProcessor, SerializableRawRecordProcessor}
2424
import za.co.absa.cobrix.spark.cobol.source.SerializableConfiguration
2525
import za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer
2626

@@ -47,7 +47,7 @@ object SparkCobolProcessor {
4747
class SparkCobolProcessorBuilder(implicit spark: SparkSession) {
4848
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
4949
private var copybookContentsOpt: Option[String] = None
50-
private var rawRecordProcessorOpt: Option[RawRecordProcessor] = None
50+
private var rawRecordProcessorOpt: Option[SerializableRawRecordProcessor] = None
5151
private var numberOfThreads: Int = 1
5252

5353
def build(): SparkCobolProcessor = {
@@ -82,7 +82,7 @@ object SparkCobolProcessor {
8282
this
8383
}
8484

85-
def withRecordProcessor(processor: RawRecordProcessor): SparkCobolProcessorBuilder = {
85+
def withRecordProcessor(processor: SerializableRawRecordProcessor): SparkCobolProcessorBuilder = {
8686
rawRecordProcessorOpt = Option(processor)
8787
this
8888
}
@@ -130,7 +130,7 @@ object SparkCobolProcessor {
130130
outputPath: String,
131131
copybookContents: String,
132132
cobolProcessor: CobolProcessor,
133-
rawRecordProcessor: RawRecordProcessor,
133+
rawRecordProcessor: SerializableRawRecordProcessor,
134134
sconf: SerializableConfiguration,
135135
numberOfThreads: Int
136136
)(implicit spark: SparkSession): RDD[Long] = {
@@ -145,7 +145,7 @@ object SparkCobolProcessor {
145145
outputPath: String,
146146
copybookContents: String,
147147
cobolProcessor: CobolProcessor,
148-
rawRecordProcessor: RawRecordProcessor,
148+
rawRecordProcessor: SerializableRawRecordProcessor,
149149
sconf: SerializableConfiguration,
150150
numberOfThreads: Int
151151
): Long = {
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.spark.cobol
18+
19+
import org.apache.hadoop.fs.Path
20+
import org.scalatest.wordspec.AnyWordSpec
21+
import za.co.absa.cobrix.cobol.parser.Copybook
22+
import za.co.absa.cobrix.cobol.processor.SerializableRawRecordProcessor
23+
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
24+
import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture}
25+
26+
class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {
27+
private val copybook =
28+
""" 01 RECORD.
29+
| 05 T PIC X.
30+
|""".stripMargin
31+
32+
private val rawRecordProcessor = new SerializableRawRecordProcessor {
33+
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
34+
record.map(v => (v - 1).toByte)
35+
}
36+
}
37+
38+
"SparkCobolProcessor" should {
39+
"fail to create when a copybook is not specified" in {
40+
val exception = intercept[IllegalArgumentException] {
41+
SparkCobolProcessor.builder.build()
42+
}
43+
44+
assert(exception.getMessage.contains("Copybook contents must be provided."))
45+
}
46+
47+
"fail to create when a record processor is not provided" in {
48+
val exception = intercept[IllegalArgumentException] {
49+
SparkCobolProcessor.builder
50+
.withCopybookContents(copybook)
51+
.build()
52+
}
53+
54+
assert(exception.getMessage.contains("A RawRecordProcessor must be provided."))
55+
}
56+
57+
"fail to create when the number of threads is less than 0" in {
58+
val exception = intercept[IllegalArgumentException] {
59+
SparkCobolProcessor.builder
60+
.withCopybookContents(copybook)
61+
.withRecordProcessor(rawRecordProcessor)
62+
.withMultithreaded(0)
63+
.build()
64+
}
65+
66+
assert(exception.getMessage.contains("Number of threads must be at least 1."))
67+
}
68+
69+
"create a processor that processes files via an RDD" in {
70+
withTempDirectory("spark_cobol_processor") { tempDir =>
71+
val binData = Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte)
72+
73+
val inputPath = new Path(tempDir, "input.dat").toString
74+
val outputPath = new Path(tempDir, "output").toString
75+
val outputFile = new Path(outputPath, "input.dat").toString
76+
77+
writeBinaryFile(inputPath, binData)
78+
79+
val processor = SparkCobolProcessor.builder
80+
.withCopybookContents(copybook)
81+
.withRecordProcessor(rawRecordProcessor)
82+
.build()
83+
84+
processor.process(Seq(inputPath), outputPath)
85+
86+
val outputData = readBinaryFile(outputFile)
87+
88+
assert(outputData.length == binData.length)
89+
assert(outputData.head == 0xF0.toByte)
90+
assert(outputData(1) == 0xF1.toByte)
91+
assert(outputData(2) == 0xF2.toByte)
92+
assert(outputData(3) == 0xF3.toByte)
93+
}
94+
}
95+
}
96+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/fixtures/BinaryFileFixture.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ trait BinaryFileFixture {
113113
tempFile
114114
}
115115

116+
def writeBinaryFile(filePath: String, content: Array[Byte]): Unit = {
117+
val ostream = new DataOutputStream(new FileOutputStream(filePath))
118+
ostream.write(content)
119+
ostream.close()
120+
}
121+
122+
def readBinaryFile(filePath: String): Array[Byte] = {
123+
FileUtils.readFileToByteArray(new File(filePath))
124+
}
125+
116126
private def hex2bytes(hex: String): Array[Byte] = {
117127
val compactStr = hex.replaceAll("\\s", "")
118128
compactStr.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)

0 commit comments

Comments
 (0)