@@ -23,36 +23,50 @@ import java.util
2323 *
2424 * Record extractors are used for in situations where the size of records in a file is not fixed and cannot be
2525 * determined neither from the copybook nor from record headers.
26+ *
27+ * Empty likes (ones that contain only LF / CRLF) are skipped.
28+ *
29+ * The implementation is optimized for performance, so might be not obviously readable.
30+ * Hopefully, comments will help anyone reading this.
2631 */
2732class TextRecordExtractor (ctx : RawRecordContext ) extends Serializable with RawRecordExtractor {
33+ // Maximum possible record size is the size of the copybook record + maximum size of the delimiter (2 characters for CRLF).
2834 private val maxRecordSize = ctx.copybook.getRecordSize + 2
35+
36+ // This is the buffer to keep the part of the stream that will be split by records.
37+ // The size of the array is always the maximum record size. The number of bytes that contain useful payload is specified
38+ // in pendingBytesSize.
2939 private val pendingBytes = new Array [Byte ](maxRecordSize)
3040 private var pendingBytesSize = 0
31- private var recordBytes : Option [Array [Byte ]] = None
41+
42+ // If true, curRecordSize and curPayloadSize point to a record, otherwise the next record needs to be found
43+ private var isRawRecordFound = false
44+ // The number of bytes from pendingBytes that correspond to a record, including line break character(s)
3245 private var curRecordSize = 0
33- private var lastFooterSize = 1
46+ // The number of bytes from pendingBytes that correspond to a record, without line break character(s)
47+ private var curPayloadSize = 0
48+ // The number of bytes the line breaking character has taken for the last record. Can only be 1 (LF) or 2 (CR LF).
49+ private var lastLineBreakSize = 1
3450
3551 override def hasNext : Boolean = {
36- if (recordBytes.isEmpty ) {
52+ if (! isRawRecordFound ) {
3753 ensureBytesRead(maxRecordSize)
38- fetchNextRecord ()
54+ findNextRecord ()
3955 }
4056
41- recordBytes.get.length > 0
57+ curRecordSize > 0
4258 }
4359
4460 override def next (): Array [Byte ] = {
4561 if (! hasNext) {
4662 throw new NoSuchElementException
4763 }
48- val bytesToReturn = recordBytes.get
49- curRecordSize = 0
50- recordBytes = None
51- bytesToReturn
64+ fetchNextRecord()
5265 }
5366
5467 override def offset : Long = ctx.inputStream.offset - pendingBytesSize - curRecordSize
5568
69+ // This method ensures that pendingBytes contains the specified number of bytes read from the input stream
5670 private def ensureBytesRead (numOfBytes : Int ): Unit = {
5771 val bytesToRead = numOfBytes - pendingBytesSize
5872 if (bytesToRead > 0 ) {
@@ -64,6 +78,7 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
6478 }
6579 }
6680
81+ // This method skips empty lines, by ignoring lines that begin from CR / LF
6782 private def skipEmptyLines (): Unit = {
6883 var i = 0
6984 while (i < pendingBytesSize && (pendingBytes(i) == 0x0D || pendingBytes(i) == 0x0A )) {
@@ -75,6 +90,9 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
7590 }
7691 }
7792
93+ // This method finds the location of the end of the next record by searching for line ending characters
94+ // The data in pendingBytes is expected to be the length of maxRecordSize, or can be smaller for the last
95+ // record in the file
7896 private def findNextNonEmptyRecord (): (Int , Int ) = {
7997 var recordLength = 0
8098 var recordPayload = 0
@@ -95,36 +113,49 @@ class TextRecordExtractor(ctx: RawRecordContext) extends Serializable with RawRe
95113 (recordLength, recordPayload)
96114 }
97115
98- private def fetchNextRecord (): Unit = {
116+ // This method finds the location of the end of the next record and adjusts curRecordSize and curPayloadSize
117+ // so that the next record can be fetched. Skips empty lines.
118+ private def findNextRecord (): Unit = {
99119 skipEmptyLines()
100120
101- var (recordLength, recordPayload) = findNextNonEmptyRecord()
121+ val (recordLength, recordPayload) = findNextNonEmptyRecord()
102122
103- recordBytes = if (recordLength > 0 ) {
123+ if (recordLength > 0 ) {
104124 curRecordSize = recordLength
105- Some (pendingBytes.take( recordPayload))
125+ curPayloadSize = recordPayload
106126 } else {
107127 // Last record or a record is too large?
108128 // In the latter case
109129 if (ctx.inputStream.isEndOfStream) {
110130 // Last record
111- recordLength = pendingBytesSize
112- recordPayload = pendingBytesSize
131+ curRecordSize = pendingBytesSize
132+ curPayloadSize = pendingBytesSize
113133 } else {
114134 // This is an errors situation - no line breaks between records
115135 // Return a record worth of data minus line break.
116- recordLength = pendingBytesSize - lastFooterSize
117- recordPayload = pendingBytesSize - lastFooterSize
136+ curRecordSize = pendingBytesSize - lastLineBreakSize
137+ curPayloadSize = pendingBytesSize - lastLineBreakSize
118138 }
119- curRecordSize = recordLength
120- Some (pendingBytes.take(recordLength))
121139 }
122140
123- advanceArray(recordLength)
141+ isRawRecordFound = true
142+
143+ lastLineBreakSize = recordLength - recordPayload
144+ }
124145
125- lastFooterSize = recordLength - recordPayload
146+ // This method extracts the current record from the buffer array.
147+ // It should only be called when curRecordSize and curPayloadSize are set properly.
148+ private def fetchNextRecord (): Array [Byte ] = {
149+ val bytes = pendingBytes.take(curPayloadSize)
150+ advanceArray(curRecordSize)
151+ isRawRecordFound = false
152+ curPayloadSize = 0
153+ curRecordSize = 0
154+ bytes
126155 }
127156
157+ // This method shifts the internal buffer pendingBytes to the left by the size of the record.
158+ // It also fills the rest of the array with 0x0 character.
128159 private def advanceArray (recordLength : Int ): Unit = {
129160 if (pendingBytesSize > recordLength) {
130161 System .arraycopy(pendingBytes, recordLength, pendingBytes, 0 , pendingBytesSize - recordLength)
0 commit comments