Skip to content

Commit 53eb3e5

Browse files
committed
#788 Implement SparkCobol processor that can run from RDDs.
1 parent 9873a3d commit 53eb3e5

File tree

5 files changed

+370
-80
lines changed

5 files changed

+370
-80
lines changed

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616

1717
package za.co.absa.cobrix.cobol.processor
1818

19-
import za.co.absa.cobrix.cobol.processor.impl.{ArrayOfAnyHandler, StreamProcessor}
20-
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
21-
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
19+
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorImpl
2220
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
2321
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
2422
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
@@ -54,25 +52,7 @@ object CobolProcessor {
5452
val readerParameters = getReaderParameters
5553
val cobolSchema = getCobolSchema(readerParameters)
5654

57-
new CobolProcessor {
58-
override def process(inputStream: SimpleStream,
59-
outputStream: OutputStream)
60-
(rawRecordProcessor: RawRecordProcessor): Long = {
61-
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
62-
63-
val dataStream = inputStream.copyStream()
64-
try {
65-
StreamProcessor.processStream(cobolSchema.copybook,
66-
caseInsensitiveOptions.toMap,
67-
dataStream,
68-
recordExtractor,
69-
rawRecordProcessor,
70-
outputStream)
71-
} finally {
72-
dataStream.close()
73-
}
74-
}
75-
}
55+
new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContents, caseInsensitiveOptions.toMap)
7656
}
7757

7858
/**
@@ -109,21 +89,6 @@ object CobolProcessor {
10989
CobolParametersParser.getReaderProperties(cobolParameters, None)
11090
}
11191

112-
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
113-
val dataStream = inputStream.copyStream()
114-
val headerStream = inputStream.copyStream()
115-
116-
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
117-
118-
reader.recordExtractor(0, dataStream, headerStream) match {
119-
case Some(extractor) => extractor
120-
case None =>
121-
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
122-
"Please check the copybook and the reader parameters."
123-
)
124-
}
125-
}
126-
12792
private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
12893
}
12994

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor.impl
18+
19+
import za.co.absa.cobrix.cobol.parser.Copybook
20+
import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor}
21+
import za.co.absa.cobrix.cobol.reader.VarLenNestedReader
22+
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
23+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
24+
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
25+
26+
import java.io.OutputStream
27+
28+
/**
29+
* Implementation of the CobolProcessor trait, responsible for processing COBOL data streams
30+
* by extracting records and applying a user-defined raw record processor.
31+
*
32+
* The processing can be done from inside an RDD so this is why it is serializable.
33+
*
34+
* Please, do not use this class directly. Use `CobolProcessor.builder()` instead.
35+
*
36+
* @param readerParameters Configuration for record extraction and COBOL file parsing.
37+
* @param copybook The copybook definition used for interpreting COBOL data structures.
38+
* @param copybookContents The raw textual representation of the copybook.
39+
* @param options A map of processing options to customize the behavior of the processor (same as for `spark-cobol`).
40+
*/
41+
class CobolProcessorImpl(readerParameters: ReaderParameters,
42+
copybook: Copybook,
43+
copybookContents: String,
44+
options: Map[String, String]) extends CobolProcessor with Serializable {
45+
override def process(inputStream: SimpleStream,
46+
outputStream: OutputStream)
47+
(rawRecordProcessor: RawRecordProcessor): Long = {
48+
val recordExtractor = getRecordExtractor(readerParameters, inputStream)
49+
50+
val dataStream = inputStream.copyStream()
51+
try {
52+
StreamProcessor.processStream(copybook,
53+
options,
54+
dataStream,
55+
recordExtractor,
56+
rawRecordProcessor,
57+
outputStream)
58+
} finally {
59+
dataStream.close()
60+
}
61+
}
62+
63+
private[processor] def getRecordExtractor(readerParameters: ReaderParameters, inputStream: SimpleStream): RawRecordExtractor = {
64+
val dataStream = inputStream.copyStream()
65+
val headerStream = inputStream.copyStream()
66+
67+
val reader = new VarLenNestedReader[Array[Any]](Seq(copybookContents), readerParameters, new ArrayOfAnyHandler)
68+
69+
reader.recordExtractor(0, dataStream, headerStream) match {
70+
case Some(extractor) => extractor
71+
case None =>
72+
throw new IllegalArgumentException(s"Cannot create a record extractor for the given reader parameters. " +
73+
"Please check the copybook and the reader parameters."
74+
)
75+
}
76+
}
77+
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class CobolProcessorBuilderSuite extends AnyWordSpec {
3030
""" 01 RECORD.
3131
| 05 T PIC X.
3232
|""".stripMargin
33+
3334
"process" should {
3435
"process an input data stream into an output stream" in {
3536
val is = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
@@ -74,47 +75,4 @@ class CobolProcessorBuilderSuite extends AnyWordSpec {
7475
assert(builder.getOptions.contains("record_format"))
7576
}
7677
}
77-
78-
"getRecordExtractor" should {
79-
"work for an fixed-record-length files" in {
80-
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
81-
val builder = CobolProcessor.builder(copybook)
82-
83-
val ext = builder.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream)
84-
85-
assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor])
86-
87-
assert(ext.hasNext)
88-
assert(ext.next().sameElements(Array(0xF1, 0xF2).map(_.toByte)))
89-
assert(ext.next().sameElements(Array(0xF3, 0xF4).map(_.toByte)))
90-
assert(!ext.hasNext)
91-
}
92-
93-
"work for an variable-record-length files" in {
94-
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
95-
val builder = CobolProcessor.builder(copybook)
96-
97-
val ext = builder.getRecordExtractor(ReaderParameters(
98-
recordFormat = RecordFormat.VariableLength,
99-
isText = true
100-
), stream)
101-
102-
assert(ext.isInstanceOf[TextFullRecordExtractor])
103-
}
104-
105-
"throw an exception on a non-supported record format for processing" in {
106-
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
107-
val builder = CobolProcessor.builder(copybook)
108-
109-
val ex = intercept[IllegalArgumentException] {
110-
builder.getRecordExtractor(ReaderParameters(
111-
recordFormat = RecordFormat.VariableLength,
112-
isRecordSequence = true
113-
), stream)
114-
}
115-
116-
assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters."))
117-
}
118-
}
119-
12078
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.processor.impl
18+
19+
import org.scalatest.wordspec.AnyWordSpec
20+
import za.co.absa.cobrix.cobol.mock.ByteStreamMock
21+
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
22+
import za.co.absa.cobrix.cobol.processor.CobolProcessor
23+
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, TextFullRecordExtractor}
24+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
25+
26+
class CobolProcessorImplSuite extends AnyWordSpec {
27+
private val copybook =
28+
""" 01 RECORD.
29+
| 05 T PIC X.
30+
|""".stripMargin
31+
32+
"getRecordExtractor" should {
33+
"work for an fixed-record-length files" in {
34+
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
35+
val processor = CobolProcessor.builder(copybook).build().asInstanceOf[CobolProcessorImpl]
36+
37+
val ext = processor.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream)
38+
39+
assert(ext.isInstanceOf[FixedRecordLengthRawRecordExtractor])
40+
41+
assert(ext.hasNext)
42+
assert(ext.next().sameElements(Array(0xF1, 0xF2).map(_.toByte)))
43+
assert(ext.next().sameElements(Array(0xF3, 0xF4).map(_.toByte)))
44+
assert(!ext.hasNext)
45+
}
46+
47+
"work for an variable-record-length files" in {
48+
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
49+
val processor = CobolProcessor.builder(copybook).build().asInstanceOf[CobolProcessorImpl]
50+
51+
val ext = processor.getRecordExtractor(ReaderParameters(
52+
recordFormat = RecordFormat.VariableLength,
53+
isText = true
54+
), stream)
55+
56+
assert(ext.isInstanceOf[TextFullRecordExtractor])
57+
}
58+
59+
"throw an exception on a non-supported record format for processing" in {
60+
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
61+
val processor = CobolProcessor.builder(copybook).build().asInstanceOf[CobolProcessorImpl]
62+
63+
val ex = intercept[IllegalArgumentException] {
64+
processor.getRecordExtractor(ReaderParameters(
65+
recordFormat = RecordFormat.VariableLength,
66+
isRecordSequence = true
67+
), stream)
68+
}
69+
70+
assert(ex.getMessage.contains("Cannot create a record extractor for the given reader parameters."))
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)