Class CountingIterator

java.lang.Object
com.linkedin.venice.spark.datawriter.task.CountingIterator
All Implemented Interfaces:
Iterator<org.apache.spark.sql.Row>

public class CountingIterator extends Object implements Iterator<org.apache.spark.sql.Row>
Wraps an Iterator<Row> to count records and accumulate byte sizes as records pass through. Used to instrument VPJ Spark pipeline stages with per-stage I/O tracking.

Byte size per record is computed as: key.length + (value != null ? value.length : 0) + (rmd != null ? rmd.length : 0). This matches the three BinaryType columns in SparkConstants.DEFAULT_SCHEMA.

Overhead is minimal: one accumulator increment + one byte size computation per record.

  • Constructor Details

    • CountingIterator

      public CountingIterator(Iterator<org.apache.spark.sql.Row> delegate, org.apache.spark.util.LongAccumulator recordCounter, org.apache.spark.util.LongAccumulator byteCounter, org.apache.spark.sql.types.StructType schema, String keyColumnName, String valueColumnName, String rmdColumnName)
      Parameters:
      schema - optional schema to resolve column indices once at construction time. If null, byte size tracking returns 0 (record counting still works).
      keyColumnName - column name for key (e.g., "key")
      valueColumnName - column name for value (e.g., "value")
      rmdColumnName - column name for RMD (e.g., "rmd" or "__replication_metadata_payload__"). If null, RMD bytes are not counted.
  • Method Details

    • hasNext

      public boolean hasNext()
      Specified by:
      hasNext in interface Iterator<org.apache.spark.sql.Row>
    • next

      public org.apache.spark.sql.Row next()
      Specified by:
      next in interface Iterator<org.apache.spark.sql.Row>
    • computeByteSizeByIndices

      public static long computeByteSizeByIndices(org.apache.spark.sql.Row row, int... indices)
      Static helper for computing byte size from a Row using pre-resolved column indices. Used in groupByKey.flatMapGroups lambdas where a CountingIterator instance is not available.
      Parameters:
      indices - variable-length list of column indices to sum. Negative indices are skipped.