Class AbstractDataWriterSparkJob
java.lang.Object
com.linkedin.venice.jobs.DataWriterComputeJob
com.linkedin.venice.spark.datawriter.jobs.AbstractDataWriterSparkJob
- All Implemented Interfaces:
ComputeJob,Closeable,AutoCloseable
- Direct Known Subclasses:
DataWriterSparkJob
The implementation of
DataWriterComputeJob for Spark engine.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.jobs.DataWriterComputeJob
DataWriterComputeJob.ConfigSetterNested classes/interfaces inherited from interface com.linkedin.venice.jobs.ComputeJob
ComputeJob.Status -
Field Summary
Fields inherited from class com.linkedin.venice.jobs.DataWriterComputeJob
PASS_THROUGH_CONFIG_PREFIXES -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>applyChunkAssembly(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply chunk assembly if chunking is enabled.protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>applyCompaction(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply compaction to the Kafka input dataframe.protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>applyCompressionReEncoding(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply compression re-encoding if the source and destination compression strategies are different.protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>applyTTLFilter(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply TTL filtering to the Kafka input dataframe.voidclose()voidconfigure(VeniceProperties props, PushJobSetting pushJobSetting) protected org.apache.spark.api.java.function.MapPartitionsFunction<org.apache.spark.sql.Row,org.apache.spark.sql.Row> createPartitionWriterFactory(org.apache.spark.broadcast.Broadcast<Properties> broadcastProperties, DataWriterAccumulators accumulators) Creates the partition writer factory.protected DataWriterAccumulatorsprotected VenicePropertiesprotected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>Get the input DataFrame from Kafka/PubSub source for repush workloads.protected org.apache.spark.sql.SparkSessionprotected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>Get the data frame based on the user's input data.voidkill()voidprotected voidsetInputConf(org.apache.spark.sql.SparkSession session, org.apache.spark.sql.DataFrameReader dataFrameReader, String key, String value) protected voidvalidateRmdSchema(PushJobSetting pushJobSetting) Methods inherited from class com.linkedin.venice.jobs.DataWriterComputeJob
configure, getFailureReason, getStatus, populateWithPassThroughConfigs, populateWithPassThroughConfigs, runJob, validateJob
-
Constructor Details
-
AbstractDataWriterSparkJob
public AbstractDataWriterSparkJob()
-
-
Method Details
-
configure
- Specified by:
configurein classDataWriterComputeJob
-
getSparkSession
protected org.apache.spark.sql.SparkSession getSparkSession() -
getUserInputDataFrame
protected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> getUserInputDataFrame()Get the data frame based on the user's input data. The schema of theRowhas the following constraints:- Must contain a field "key" with the schema:
DataTypes.BinaryType. This is the key of the record represented in serialized Avro. - Must contain a field "value" with the schema:
DataTypes.BinaryType. This is the value of the record represented in serialized Avro. - Must not contain fields with names beginning with "_". These are reserved for internal use.
- Can contain fields that do not violate the above constraints
- Returns:
- The data frame based on the user's input data
- Must contain a field "key" with the schema:
-
validateRmdSchema
-
getKafkaInputDataFrame
protected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> getKafkaInputDataFrame()Get the input DataFrame from Kafka/PubSub source for repush workloads. This method reads from a Venice version topic (Kafka) and returns a DataFrame with the RAW_PUBSUB_INPUT_TABLE_SCHEMA.- Returns:
- DataFrame containing the Kafka input data
-
applyTTLFilter
protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyTTLFilter(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply TTL filtering to the Kafka input dataframe. This method filters out records that are older than the configured TTL threshold. The input dataframe must have the RAW_PUBSUB_INPUT_TABLE_SCHEMA (region, partition, offset, message_type, schema_id, key, value, rmd_version_id, rmd_payload).- Parameters:
dataFrame- Input dataframe with RAW_PUBSUB_INPUT_TABLE_SCHEMA- Returns:
- Filtered dataframe (with stale records removed if TTL filtering is enabled)
-
applyCompaction
protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyCompaction(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply compaction to the Kafka input dataframe. For each key, keep only the record with the highest offset.- Parameters:
dataFrame- Input dataframe with RAW_PUBSUB_INPUT_TABLE_SCHEMA- Returns:
- Compacted dataframe with duplicate keys removed
-
applyChunkAssembly
protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyChunkAssembly(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply chunk assembly if chunking is enabled. Groups records by key, sorts by offset DESC, and assembles chunks into complete values/RMDs. If TTL filtering is enabled, the assembler also filters assembled records post-assembly.- Parameters:
dataFrame- Input with SCHEMA_FOR_CHUNK_ASSEMBLY (7 columns)- Returns:
- DataFrame with DEFAULT_SCHEMA_WITH_SCHEMA_ID (5 columns - assembled records)
-
applyCompressionReEncoding
protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyCompressionReEncoding(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply compression re-encoding if the source and destination compression strategies are different. This is only applicable for repush workloads (isSourceKafka = true).- Parameters:
dataFrame- Input dataframe- Returns:
- Dataframe with values re-compressed if needed
-
setInputConf
-
getTaskTracker
- Specified by:
getTaskTrackerin classDataWriterComputeJob
-
getAccumulatorsForDataWriterJob
-
getJobProperties
-
getPushJobSetting
- Specified by:
getPushJobSettingin classDataWriterComputeJob
-
runComputeJob
public void runComputeJob()- Specified by:
runComputeJobin classDataWriterComputeJob
-
kill
public void kill()- Specified by:
killin interfaceComputeJob- Overrides:
killin classDataWriterComputeJob
-
createPartitionWriterFactory
protected org.apache.spark.api.java.function.MapPartitionsFunction<org.apache.spark.sql.Row,org.apache.spark.sql.Row> createPartitionWriterFactory(org.apache.spark.broadcast.Broadcast<Properties> broadcastProperties, DataWriterAccumulators accumulators) Creates the partition writer factory. Can be overridden for testing purposes.- Parameters:
broadcastProperties- the broadcast job propertiesaccumulators- the data writer accumulators- Returns:
- the partition writer factory
-
close
- Throws:
IOException
-