Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ Among the motivations for this project, it is possible to highlight:

- The COBOL copybooks parser doesn't have a Spark dependency and can be reused for integrating into other data processing engines.

- Supports reading files compressed in Hadoop-compatible way (gzip, bzip2, etc), but with limited parallelism.
Uncompressed files are preferred for performance.

## Videos

We have presented Cobrix at DataWorks Summit 2019 and Spark Summit 2019 conferences. The screencasts are available here:
Expand Down Expand Up @@ -1602,7 +1605,7 @@ The output looks like this:
| .option("redefine-segment-id-map:0", "REDEFINED_FIELD1 => SegmentId1,SegmentId2,...") | Specifies a mapping between redefined field names and segment id values. Each option specifies a mapping for a single segment. The numeric value for each mapping option must be incremented so the option keys are unique. |
| .option("segment-children:0", "COMPANY => EMPLOYEE,DEPARTMENT") | Specifies a mapping between segment redefined fields and their children. Each option specifies a mapping for a single parent field. The numeric value for each mapping option must be incremented so the option keys are unique. If such mapping is specified hierarchical record structure will be automatically reconstructed. This require `redefine-segment-id-map` to be set. |
| .option("enable_indexes", "true") | Turns on indexing of multisegment variable length files (on by default). |
| .option("enable_index_cache", "false") | When true, calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
| .option("enable_index_cache", "true") | When true (default), calculated indexes are cached in memory for later use. This improves performance of processing when same files are processed more than once. |
| .option("input_split_records", 50000) | Specifies how many records will be allocated to each split/partition. It will be processed by Spark tasks. (The default is not set and the split will happen according to size, see the next option) |
| .option("input_split_size_mb", 100) | Specify how many megabytes to allocate to each partition/split. (The default is 100 MB) |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import scala.reflect.ClassTag
class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
readerProperties: ReaderParameters,
handler: RecordHandler[T]) extends VarLenReader with Logging with Serializable {
private val DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB = 1024
private val DEFAULT_FS_INDEX_SIZE_MULTIPLIER = 4

protected val cobolSchema: CobolSchema = loadCopyBook(copybookContents)

Expand Down Expand Up @@ -144,7 +146,7 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
fileNumber: Int,
isRdwBigEndian: Boolean): ArrayBuffer[SparseIndexEntry] = {
val inputSplitSizeRecords: Option[Int] = readerProperties.inputSplitRecords
val inputSplitSizeMB: Option[Int] = getSplitSizeMB
val inputSplitSizeMB: Option[Int] = getSplitSizeMB(dataStream.isCompressed)

if (inputSplitSizeRecords.isDefined) {
if (inputSplitSizeRecords.get < 1 || inputSplitSizeRecords.get > 1000000000) {
Expand All @@ -153,8 +155,9 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
logger.info(s"Input split size = ${inputSplitSizeRecords.get} records")
} else {
if (inputSplitSizeMB.nonEmpty) {
if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > 2000) {
throw new IllegalArgumentException(s"Invalid input split size of ${inputSplitSizeMB.get} MB.")
val maxSplitSizeMB = if (dataStream.isCompressed) 200000 else 2000
if (inputSplitSizeMB.get < 1 || inputSplitSizeMB.get > maxSplitSizeMB) {
throw new IllegalArgumentException(s"Invalid input split size of ${inputSplitSizeMB.get} MB (max allowed: $maxSplitSizeMB MB).")
}
logger.info(s"Input split size = ${inputSplitSizeMB.get} MB")
}
Expand Down Expand Up @@ -214,11 +217,11 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
}
}

private def getSplitSizeMB: Option[Int] = {
if (readerProperties.inputSplitSizeMB.isDefined) {
readerProperties.inputSplitSizeMB
private def getSplitSizeMB(isCompressed: Boolean): Option[Int] = {
if (isCompressed) {
readerProperties.inputSplitSizeCompressedMB.orElse(Some(DEFAULT_INDEX_SIZE_COMPRESSED_FILES_MB))
} else {
readerProperties.hdfsDefaultBlockSize
readerProperties.inputSplitSizeMB.orElse(readerProperties.fsDefaultBlockSize).map(_ * DEFAULT_FS_INDEX_SIZE_MULTIPLIER)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,14 @@ object CobolParametersParser extends Logging {
val PARAM_SEGMENT_REDEFINE_PREFIX_ALT = "redefine-segment-id-map"

// Indexed multisegment file processing
val PARAM_ENABLE_INDEXES = "enable_indexes"
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation"
val PARAM_IMPROVE_LOCALITY = "improve_locality"
val PARAM_ENABLE_INDEXES = "enable_indexes"
val PARAM_ENABLE_INDEX_CACHE = "enable_index_cache"
val PARAM_INPUT_SPLIT_RECORDS = "input_split_records"
val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb"
val PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB = "input_split_size_compressed_mb"
val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix"
val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation"
val PARAM_IMPROVE_LOCALITY = "improve_locality"

// Parameters for debugging
val PARAM_DEBUG_LAYOUT_POSITIONS = "debug_layout_positions"
Expand Down Expand Up @@ -385,6 +386,7 @@ object CobolParametersParser extends Logging {
isIndexCachingAllowed = false,
inputSplitRecords = None,
inputSplitSizeMB = None,
inputSplitSizeCompressedMB = None,
improveLocality = false,
optimizeAllocation = false,
inputFileNameColumn = "",
Expand Down Expand Up @@ -421,7 +423,8 @@ object CobolParametersParser extends Logging {
isIndexCachingAllowed = varLenParams.isIndexCachingAllowed,
inputSplitRecords = varLenParams.inputSplitRecords,
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
hdfsDefaultBlockSize = defaultBlockSize,
inputSplitSizeCompressedMB = varLenParams.inputSplitSizeCompressedMB,
fsDefaultBlockSize = defaultBlockSize,
startOffset = parameters.recordStartOffset,
endOffset = parameters.recordEndOffset,
fileStartOffset = varLenParams.fileStartOffset,
Expand Down Expand Up @@ -505,9 +508,10 @@ object CobolParametersParser extends Logging {
fileEndOffset,
isRecordIdGenerationEnabled,
params.getOrElse(PARAM_ENABLE_INDEXES, "true").toBoolean,
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "false").toBoolean,
params.getOrElse(PARAM_ENABLE_INDEX_CACHE, "true").toBoolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, find the file and examine its current state
find . -name "CobolParametersParser.scala" -type f 2>/dev/null

# Check if the file exists and examine lines around 511
if [ -f "cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala" ]; then
  echo "=== File found, showing context around line 511 ==="
  sed -n '505,520p' cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
fi

# Search for PARAM_ENABLE_INDEX_CACHE throughout the codebase
echo ""
echo "=== Searching for PARAM_ENABLE_INDEX_CACHE references ==="
rg -n "PARAM_ENABLE_INDEX_CACHE" --type scala

# Check git history for changes to this parameter
echo ""
echo "=== Checking git log for CobolParametersParser changes ==="
git log --oneline --all -- "cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala" 2>/dev/null | head -20

Repository: AbsaOSS/cobrix

Length of output: 3226


🏁 Script executed:

#!/bin/bash
# Check for release notes, changelog, or README documentation
echo "=== Checking for README or documentation about index cache ==="
rg -n "index_cache|index cache" -i -g "*.md" -g "README*"

# Look at the specific commit to understand the intent
echo ""
echo "=== Checking commit details ==="
git show --stat 8000d431 2>/dev/null | head -50

# Check if there are tests specifically for this default
echo ""
echo "=== Looking for tests related to index cache defaults ==="
rg -n "enable_index_cache.*true" --type scala -C 2

Repository: AbsaOSS/cobrix

Length of output: 5366


Index cache now enabled by default — verify memory impact guidance.

The default value for PARAM_ENABLE_INDEX_CACHE has changed from "false" to "true". This is a behavioral change documented in README.md that will affect all users who don't explicitly set this option.

Impact: Index caching improves performance by reusing generated indexes across jobs, but increases memory usage. While the README documents this feature is "true (default)," it lacks explicit guidance on:

  • Memory consumption for large files or multiple concurrent jobs
  • Whether memory bounds or limits are enforced
  • Recommendations for resource-constrained environments

Recommendation: Enhance documentation to include:

  1. Memory impact guidance (e.g., approximate memory usage patterns)
  2. Configuration advice for environments with limited resources
  3. Information about memory bounds or cleanup mechanisms for long-running jobs
🤖 Prompt for AI Agents
In
cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
around line 511, the default for PARAM_ENABLE_INDEX_CACHE was changed to true
which increases memory usage by caching indexes; update the project
documentation (README.md) to explicitly document the memory impact and
operational guidance: add a new section describing approximate memory usage
characteristics for index caches (e.g., per-file or per-part memory patterns and
how size scales with record count and schema complexity), describe behavior for
multiple concurrent jobs, note whether the cache has any enforced bounds or
eviction/cleanup mechanisms, and provide practical recommendations for
resource-constrained environments (how to disable the cache, tune related
settings, and monitor/limit memory use), plus a short example config snippet
showing how to explicitly disable the cache when needed.

params.get(PARAM_INPUT_SPLIT_RECORDS).map(v => v.toInt),
params.get(PARAM_INPUT_SPLIT_SIZE_MB).map(v => v.toInt),
params.get(PARAM_INPUT_SPLIT_SIZE_COMPRESSED_MB).map(v => v.toInt),
params.getOrElse(PARAM_IMPROVE_LOCALITY, "true").toBoolean,
params.getOrElse(PARAM_OPTIMIZE_ALLOCATION, "false").toBoolean,
params.getOrElse(PARAM_INPUT_FILE_COLUMN, ""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, FillerNamingPolicy, MetadataPolicy, StringTrimmingPolicy}
import za.co.absa.cobrix.cobol.parser.policies._
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
Expand Down Expand Up @@ -50,7 +50,8 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param isIndexGenerationNeeded Is indexing input file before processing is requested
* @param inputSplitRecords The number of records to include in each partition. Notice mainframe records may have variable size, inputSplitMB is the recommended option
* @param inputSplitSizeMB A partition size to target. In certain circumstances this size may not be exactly that, but the library will do the best effort to target that size
* @param hdfsDefaultBlockSize Default HDFS block size for the HDFS filesystem used. This value is used as the default split size if inputSplitSizeMB is not specified
* @param inputSplitSizeCompressedMB A partition size to target for compressed files.
* @param fsDefaultBlockSize Default HDFS block size for the HDFS filesystem used. This value is used as the default split size if inputSplitSizeMB is not specified
* @param startOffset An offset to the start of the record in each binary data block.
* @param endOffset An offset from the end of the record to the end of the binary data block.
* @param fileStartOffset A number of bytes to skip at the beginning of each file
Expand Down Expand Up @@ -102,7 +103,8 @@ case class ReaderParameters(
isIndexCachingAllowed: Boolean = false,
inputSplitRecords: Option[Int] = None,
inputSplitSizeMB: Option[Int] = None,
hdfsDefaultBlockSize: Option[Int] = None,
inputSplitSizeCompressedMB: Option[Int] = None,
fsDefaultBlockSize: Option[Int] = None,
startOffset: Int = 0,
endOffset: Int = 0,
fileStartOffset: Int = 0,
Expand Down
Loading
Loading