Skip to content

Commit 12b3845

Browse files
committed
#788 Unify processor builders between the parser and spark module.
1 parent 35ab1b5 commit 12b3845

File tree

7 files changed

+258
-24
lines changed

7 files changed

+258
-24
lines changed

README.md

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1668,15 +1668,17 @@ The EBCDIC processor allows processing files by replacing value of fields withou
16681668

16691669
The processing does not require Spark. A processing application can have only the COBOL parser as a dependency (`cobol-parser`).
16701670

1671-
Here is an example usage:
1671+
Here is an example usage (using streams of bytes):
16721672
```scala
16731673
val is = new FSStream(inputFile)
16741674
val os = new FileOutputStream(outputFile)
1675-
val copybookContents = "...some copybook..."
16761675
val builder = CobolProcessor.builder(copybookContents)
16771676

1677+
val builder = CobolProcessor.builder
1678+
.withCopybookContents("...some copybook...")
1679+
16781680
val processor = new RawRecordProcessor {
1679-
override def processRecord(copybook: Copybook, options: Map[String, String], record: Array[Byte], offset: Long): Array[Byte] = {
1681+
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
16801682
// The transformation logic goes here
16811683
val value = copybook.getFieldValueByName("some_field", record, 0)
16821684
// Change the field v
@@ -1688,10 +1690,53 @@ val processor = new RawRecordProcessor {
16881690
}
16891691
}
16901692

1691-
builder.build().process(is, os)(processor)
1693+
val count = builder.build().process(is, os)(processor)
1694+
```
1695+
1696+
Here is an example usage (using paths):
1697+
```scala
1698+
val count = CobolProcessor.builder
1699+
.withCopybookContents(copybook)
1700+
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
1701+
// The transformation logic goes here
1702+
val value = copybook.getFieldValueByName("some_field", record, 0)
1703+
// Change the field v
1704+
// val newValue = ...
1705+
// Write the changed value back
1706+
copybook.setFieldValueByName("some_field", record, newValue, 0)
1707+
// Return the changed record
1708+
record
1709+
}
1710+
.load(inputFile)
1711+
.save(outputFile)
16921712
```
16931713

16941714

1715+
## EBCDIC Spark Processor (experimental)
1716+
This allows in-place processing of data retaining original format in parallel uring RDDs under the hood.
1717+
1718+
Here is an example usage:
1719+
```scala
1720+
import za.co.absa.cobrix.spark.cobol.SparkCobolProcessor
1721+
1722+
val copybookContents = "...some copybook..."
1723+
1724+
SparkCobolProcessor.builder
1725+
.withCopybookContents(copybook)
1726+
.withRecordProcessor { (record: Array[Byte], ctx: CobolProcessorContext) =>
1727+
// The transformation logic goes here
1728+
val value = ctx.copybook.getFieldValueByName("some_field", record, 0)
1729+
// Change the field v
1730+
// val newValue = ...
1731+
// Write the changed value back
1732+
ctx.copybook.setFieldValueByName("some_field", record, newValue, 0)
1733+
// Return the changed record
1734+
record
1735+
}
1736+
.load(inputPath)
1737+
.save(outputPath)
1738+
```
1739+
16951740
## EBCDIC Writer (experimental)
16961741

16971742
Cobrix's EBCDIC writer is an experimental feature that allows writing Spark DataFrames as EBCDIC mainframe files.

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/CobolProcessor.scala

Lines changed: 102 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
package za.co.absa.cobrix.cobol.processor
1818

19+
import za.co.absa.cobrix.cobol.parser.Copybook
1920
import za.co.absa.cobrix.cobol.processor.impl.CobolProcessorImpl
2021
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters, ReaderParameters}
2122
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
22-
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
23+
import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream}
2324

24-
import java.io.OutputStream
25+
import java.io.{BufferedInputStream, BufferedOutputStream, FileOutputStream, OutputStream}
2526
import scala.collection.mutable
2627

2728

@@ -45,14 +46,58 @@ trait CobolProcessor {
4546
}
4647

4748
object CobolProcessor {
48-
class CobolProcessorBuilder(copybookContents: String) {
49+
class CobolProcessorBuilder {
4950
private val caseInsensitiveOptions = new mutable.HashMap[String, String]()
51+
private var copybookContentsOpt: Option[String] = None
52+
private var rawRecordProcessorOpt: Option[RawRecordProcessor] = None
5053

5154
def build(): CobolProcessor = {
55+
if (copybookContentsOpt.isEmpty) {
56+
throw new IllegalArgumentException("Copybook contents must be provided.")
57+
}
58+
59+
val readerParameters = getReaderParameters
60+
val cobolSchema = getCobolSchema(readerParameters)
61+
62+
new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContentsOpt.get, caseInsensitiveOptions.toMap)
63+
}
64+
65+
def load(path: String): CobolProcessorLoader = {
66+
val file = new java.io.File(path)
67+
if (!file.exists) {
68+
throw new IllegalArgumentException(s"Path $path does not exist.")
69+
}
70+
71+
if (file.isDirectory) {
72+
throw new IllegalArgumentException(s"Path $path should be a file, not a directory.")
73+
}
74+
75+
if (copybookContentsOpt.isEmpty) {
76+
throw new IllegalArgumentException("Copybook contents must be provided.")
77+
}
78+
79+
if (rawRecordProcessorOpt.isEmpty) {
80+
throw new IllegalArgumentException("A RawRecordProcessor must be provided.")
81+
}
82+
83+
if (rawRecordProcessorOpt.isEmpty) {
84+
throw new IllegalArgumentException("A RawRecordProcessor must be provided.")
85+
}
86+
5287
val readerParameters = getReaderParameters
5388
val cobolSchema = getCobolSchema(readerParameters)
5489

55-
new CobolProcessorImpl(readerParameters, cobolSchema.copybook, copybookContents, caseInsensitiveOptions.toMap)
90+
new CobolProcessorLoader(path, copybookContentsOpt.get, cobolSchema.copybook, rawRecordProcessorOpt.get, readerParameters, caseInsensitiveOptions.toMap)
91+
}
92+
93+
def withCopybookContents(copybookContents: String): CobolProcessorBuilder = {
94+
copybookContentsOpt = Option(copybookContents)
95+
this
96+
}
97+
98+
def withRecordProcessor(processor: RawRecordProcessor): CobolProcessorBuilder = {
99+
rawRecordProcessorOpt = Option(processor)
100+
this
56101
}
57102

58103
/**
@@ -80,7 +125,7 @@ object CobolProcessor {
80125
}
81126

82127
private[processor] def getCobolSchema(readerParameters: ReaderParameters): CobolSchema = {
83-
CobolSchema.fromReaderParameters(Seq(copybookContents), readerParameters)
128+
CobolSchema.fromReaderParameters(Seq(copybookContentsOpt.get), readerParameters)
84129
}
85130

86131
private[processor] def getReaderParameters: ReaderParameters = {
@@ -92,7 +137,57 @@ object CobolProcessor {
92137
private[processor] def getOptions: Map[String, String] = caseInsensitiveOptions.toMap
93138
}
94139

95-
def builder(copybookContent: String): CobolProcessorBuilder = {
96-
new CobolProcessorBuilder(copybookContent)
140+
class CobolProcessorLoader(fileToProcess: String,
141+
copybookContents: String,
142+
copybook: Copybook,
143+
rawRecordProcessor: RawRecordProcessor,
144+
readerParameters: ReaderParameters,
145+
options: Map[String, String]) {
146+
def save(outputFile: String): Long = {
147+
val processor = new CobolProcessorImpl(readerParameters, copybook, copybookContents, options)
148+
149+
val ifs = new FSStream(fileToProcess)
150+
val ofs = new BufferedOutputStream(new FileOutputStream(outputFile))
151+
152+
var originalException: Throwable = null
153+
154+
val recordCount = try {
155+
processor.process(ifs, ofs)(rawRecordProcessor)
156+
} catch {
157+
case ex: Throwable =>
158+
originalException = ex
159+
0L
160+
} finally {
161+
try {
162+
ifs.close()
163+
} catch {
164+
case e: Throwable =>
165+
if (originalException != null) {
166+
originalException.addSuppressed(e)
167+
} else {
168+
originalException = e
169+
}
170+
}
171+
172+
try {
173+
ofs.close()
174+
} catch {
175+
case e: Throwable =>
176+
if (originalException != null) {
177+
originalException.addSuppressed(e)
178+
} else {
179+
originalException = e
180+
}
181+
}
182+
}
183+
184+
if (originalException != null) throw originalException
185+
186+
recordCount
187+
}
188+
}
189+
190+
def builder: CobolProcessorBuilder = {
191+
new CobolProcessorBuilder
97192
}
98193
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package za.co.absa.cobrix.cobol.base
18+
19+
import java.nio.file.{Files, Path, Paths}
20+
import java.util.Comparator
21+
import java.util.function.Consumer
22+
23+
/**
24+
* This fixture adds ability for a unit test to create temporary files for using them in the tests.
25+
*/
26+
trait BinaryFileFixture {
27+
def withTempDirectory(prefix: String)(f: String => Unit): Unit = {
28+
val tmpPath = Files.createTempDirectory(prefix)
29+
val pathStr = tmpPath.toAbsolutePath.toString
30+
31+
f(pathStr)
32+
33+
Files.walk(tmpPath)
34+
.sorted(Comparator.reverseOrder())
35+
.forEach(new Consumer[Path] {
36+
override def accept(f: Path): Unit = Files.delete(f)
37+
})
38+
}
39+
40+
def writeBinaryFile(filePath: String, content: Array[Byte]): Unit = {
41+
Files.write(Paths.get(filePath), content)
42+
}
43+
44+
def readBinaryFile(filePath: String): Array[Byte] = {
45+
Files.readAllBytes(Paths.get(filePath))
46+
}
47+
48+
private def hex2bytes(hex: String): Array[Byte] = {
49+
val compactStr = hex.replaceAll("\\s", "")
50+
compactStr.sliding(2, 2).toArray.map(Integer.parseInt(_, 16).toByte)
51+
}
52+
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@
1717
package za.co.absa.cobrix.cobol.processor
1818

1919
import org.scalatest.wordspec.AnyWordSpec
20+
import za.co.absa.cobrix.cobol.base.BinaryFileFixture
2021
import za.co.absa.cobrix.cobol.mock.ByteStreamMock
21-
import za.co.absa.cobrix.cobol.parser.Copybook
2222
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
23-
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedRecordLengthRawRecordExtractor, TextFullRecordExtractor}
2423
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2524

2625
import java.io.ByteArrayOutputStream
26+
import java.nio.file.Paths
2727

28-
class CobolProcessorBuilderSuite extends AnyWordSpec {
28+
class CobolProcessorBuilderSuite extends AnyWordSpec with BinaryFileFixture {
2929
private val copybook =
3030
""" 01 RECORD.
3131
| 05 T PIC X.
@@ -35,7 +35,8 @@ class CobolProcessorBuilderSuite extends AnyWordSpec {
3535
"process an input data stream into an output stream" in {
3636
val is = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
3737
val os = new ByteArrayOutputStream(10)
38-
val builder = CobolProcessor.builder(copybook)
38+
val builder = CobolProcessor.builder
39+
.withCopybookContents(copybook)
3940

4041
val processor = new RawRecordProcessor {
4142
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
@@ -55,9 +56,39 @@ class CobolProcessorBuilderSuite extends AnyWordSpec {
5556
}
5657
}
5758

59+
"load and save" should {
60+
"process files as expected" in {
61+
withTempDirectory("cobol_processor") { tempDir =>
62+
val inputFile = Paths.get(tempDir, "input.dat").toString
63+
val outputFile = Paths.get(tempDir, "output.dat").toString
64+
65+
writeBinaryFile(inputFile, Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
66+
67+
val count = CobolProcessor.builder
68+
.withCopybookContents(copybook)
69+
.withRecordProcessor(new RawRecordProcessor {
70+
override def processRecord(record: Array[Byte], ctx: CobolProcessorContext): Array[Byte] = {
71+
record.map(v => (v - 1).toByte)
72+
}
73+
})
74+
.load(inputFile)
75+
.save(outputFile)
76+
77+
val outputArray = readBinaryFile(outputFile)
78+
79+
assert(count == 4)
80+
assert(outputArray.head == -16)
81+
assert(outputArray(1) == -15)
82+
assert(outputArray(2) == -14)
83+
assert(outputArray(3) == -13)
84+
}
85+
}
86+
}
87+
5888
"getCobolSchema" should {
5989
"return the schema of the copybook provided" in {
60-
val builder = CobolProcessor.builder(copybook)
90+
val builder = CobolProcessor.builder
91+
.withCopybookContents(copybook)
6192

6293
val cobolSchema = builder.getCobolSchema(ReaderParameters())
6394

@@ -67,7 +98,8 @@ class CobolProcessorBuilderSuite extends AnyWordSpec {
6798

6899
"getReaderParameters" should {
69100
"return a reader according to passed options" in {
70-
val builder = CobolProcessor.builder(copybook)
101+
val builder = CobolProcessor.builder
102+
.withCopybookContents(copybook)
71103
.option("record_format", "D")
72104

73105
assert(builder.getReaderParameters.recordFormat == RecordFormat.AsciiText)

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorImplSuite.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@ class CobolProcessorImplSuite extends AnyWordSpec {
3232
"getRecordExtractor" should {
3333
"work for an fixed-record-length files" in {
3434
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
35-
val processor = CobolProcessor.builder(copybook).build().asInstanceOf[CobolProcessorImpl]
35+
val processor = CobolProcessor.builder
36+
.withCopybookContents(copybook)
37+
.build().asInstanceOf[CobolProcessorImpl]
3638

3739
val ext = processor.getRecordExtractor(ReaderParameters(recordLength = Some(2), options = Map("test" -> "option")), stream)
3840

@@ -46,7 +48,9 @@ class CobolProcessorImplSuite extends AnyWordSpec {
4648

4749
"work for an variable-record-length files" in {
4850
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
49-
val processor = CobolProcessor.builder(copybook).build().asInstanceOf[CobolProcessorImpl]
51+
val processor = CobolProcessor.builder
52+
.withCopybookContents(copybook)
53+
.build().asInstanceOf[CobolProcessorImpl]
5054

5155
val ext = processor.getRecordExtractor(ReaderParameters(
5256
recordFormat = RecordFormat.VariableLength,
@@ -58,7 +62,9 @@ class CobolProcessorImplSuite extends AnyWordSpec {
5862

5963
"throw an exception on a non-supported record format for processing" in {
6064
val stream = new ByteStreamMock(Array(0xF1, 0xF2, 0xF3, 0xF4).map(_.toByte))
61-
val processor = CobolProcessor.builder(copybook).build().asInstanceOf[CobolProcessorImpl]
65+
val processor = CobolProcessor.builder
66+
.withCopybookContents(copybook)
67+
.build().asInstanceOf[CobolProcessorImpl]
6268

6369
val ex = intercept[IllegalArgumentException] {
6470
processor.getRecordExtractor(ReaderParameters(

0 commit comments

Comments
 (0)