Class AbstractDataWriterSparkJob

java.lang.Object
com.linkedin.venice.jobs.DataWriterComputeJob
com.linkedin.venice.spark.datawriter.jobs.AbstractDataWriterSparkJob
All Implemented Interfaces:
ComputeJob, Closeable, AutoCloseable
Direct Known Subclasses:
DataWriterSparkJob

public abstract class AbstractDataWriterSparkJob extends DataWriterComputeJob
The implementation of DataWriterComputeJob for Spark engine.
  • Constructor Details

    • AbstractDataWriterSparkJob

      public AbstractDataWriterSparkJob()
  • Method Details

    • configure

      public void configure(VeniceProperties props, PushJobSetting pushJobSetting)
      Specified by:
      configure in class DataWriterComputeJob
    • getSparkSession

      protected org.apache.spark.sql.SparkSession getSparkSession()
    • getUserInputDataFrame

      protected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> getUserInputDataFrame()
      Get the data frame based on the user's input data. The schema of the Row has the following constraints:
      • Must contain a field "key" with the schema: DataTypes.BinaryType. This is the key of the record represented in serialized Avro.
      • Must contain a field "value" with the schema: DataTypes.BinaryType. This is the value of the record represented in serialized Avro.
      • Must not contain fields with names beginning with "_". These are reserved for internal use.
      • Can contain fields that do not violate the above constraints
      Returns:
      The data frame based on the user's input data
    • validateRmdSchema

      protected void validateRmdSchema(PushJobSetting pushJobSetting)
    • getKafkaInputDataFrame

      protected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> getKafkaInputDataFrame()
      Get the input DataFrame from Kafka/PubSub source for repush workloads. This method reads from a Venice version topic (Kafka) and returns a DataFrame with the RAW_PUBSUB_INPUT_TABLE_SCHEMA.
      Returns:
      DataFrame containing the Kafka input data
    • applyTTLFilter

      protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyTTLFilter(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame)
      Apply TTL filtering to the Kafka input dataframe. This method filters out records that are older than the configured TTL threshold. The input dataframe must have the RAW_PUBSUB_INPUT_TABLE_SCHEMA (region, partition, offset, message_type, schema_id, key, value, rmd_version_id, rmd_payload).
      Parameters:
      dataFrame - Input dataframe with RAW_PUBSUB_INPUT_TABLE_SCHEMA
      Returns:
      Filtered dataframe (with stale records removed if TTL filtering is enabled)
    • applyChunkAssembly

      protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyChunkAssembly(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame)
      Apply chunk assembly if chunking is enabled. Groups records by key, sorts by offset DESC, and assembles chunks into complete values/RMDs. If TTL filtering is enabled, the assembler also filters assembled records post-assembly.
      Parameters:
      dataFrame - Input with SCHEMA_FOR_CHUNK_ASSEMBLY (7 columns)
      Returns:
      DataFrame with DEFAULT_SCHEMA_WITH_SCHEMA_ID (5 columns - assembled records)
    • setInputConf

      protected void setInputConf(org.apache.spark.sql.SparkSession session, org.apache.spark.sql.DataFrameReader dataFrameReader, String key, String value)
    • getTaskTracker

      public DataWriterTaskTracker getTaskTracker()
      Specified by:
      getTaskTracker in class DataWriterComputeJob
    • getJobProperties

      protected VeniceProperties getJobProperties()
    • getPushJobSetting

      public PushJobSetting getPushJobSetting()
      Specified by:
      getPushJobSetting in class DataWriterComputeJob
    • runComputeJob

      public void runComputeJob()
      Specified by:
      runComputeJob in class DataWriterComputeJob
    • kill

      public void kill()
      Specified by:
      kill in interface ComputeJob
      Overrides:
      kill in class DataWriterComputeJob
    • createPartitionWriterFactory

      protected org.apache.spark.api.java.function.MapPartitionsFunction<org.apache.spark.sql.Row,org.apache.spark.sql.Row> createPartitionWriterFactory(org.apache.spark.broadcast.Broadcast<Properties> broadcastProperties, DataWriterAccumulators accumulators)
      Creates the partition writer factory. Can be overridden for testing purposes.
      Parameters:
      broadcastProperties - the broadcast job properties
      accumulators - the data writer accumulators
      Returns:
      the partition writer factory
    • close

      public void close() throws IOException
      Throws:
      IOException