Class AbstractDataWriterSparkJob
java.lang.Object
com.linkedin.venice.jobs.DataWriterComputeJob
com.linkedin.venice.spark.datawriter.jobs.AbstractDataWriterSparkJob
- All Implemented Interfaces:
ComputeJob,Closeable,AutoCloseable
- Direct Known Subclasses:
DataWriterSparkJob
The implementation of
DataWriterComputeJob for Spark engine.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.jobs.DataWriterComputeJob
DataWriterComputeJob.ConfigSetterNested classes/interfaces inherited from interface com.linkedin.venice.jobs.ComputeJob
ComputeJob.Status -
Field Summary
Fields inherited from class com.linkedin.venice.jobs.DataWriterComputeJob
PASS_THROUGH_CONFIG_PREFIXES -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionprotected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>applyChunkAssembly(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply chunk assembly if chunking is enabled.protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>applyTTLFilter(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply TTL filtering to the Kafka input dataframe.voidclose()voidconfigure(VeniceProperties props, PushJobSetting pushJobSetting) protected org.apache.spark.api.java.function.MapPartitionsFunction<org.apache.spark.sql.Row,org.apache.spark.sql.Row> createPartitionWriterFactory(org.apache.spark.broadcast.Broadcast<Properties> broadcastProperties, DataWriterAccumulators accumulators) Creates the partition writer factory.protected VenicePropertiesprotected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>Get the input DataFrame from Kafka/PubSub source for repush workloads.protected org.apache.spark.sql.SparkSessionprotected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row>Get the data frame based on the user's input data.voidkill()voidprotected voidsetInputConf(org.apache.spark.sql.SparkSession session, org.apache.spark.sql.DataFrameReader dataFrameReader, String key, String value) protected voidvalidateRmdSchema(PushJobSetting pushJobSetting) Methods inherited from class com.linkedin.venice.jobs.DataWriterComputeJob
configure, getFailureReason, getStatus, populateWithPassThroughConfigs, populateWithPassThroughConfigs, runJob, validateJob
-
Constructor Details
-
AbstractDataWriterSparkJob
public AbstractDataWriterSparkJob()
-
-
Method Details
-
configure
- Specified by:
configurein classDataWriterComputeJob
-
getSparkSession
protected org.apache.spark.sql.SparkSession getSparkSession() -
getUserInputDataFrame
protected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> getUserInputDataFrame()Get the data frame based on the user's input data. The schema of theRowhas the following constraints:- Must contain a field "key" with the schema:
DataTypes.BinaryType. This is the key of the record represented in serialized Avro. - Must contain a field "value" with the schema:
DataTypes.BinaryType. This is the value of the record represented in serialized Avro. - Must not contain fields with names beginning with "_". These are reserved for internal use.
- Can contain fields that do not violate the above constraints
- Returns:
- The data frame based on the user's input data
- Must contain a field "key" with the schema:
-
validateRmdSchema
-
getKafkaInputDataFrame
protected abstract org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> getKafkaInputDataFrame()Get the input DataFrame from Kafka/PubSub source for repush workloads. This method reads from a Venice version topic (Kafka) and returns a DataFrame with the RAW_PUBSUB_INPUT_TABLE_SCHEMA.- Returns:
- DataFrame containing the Kafka input data
-
applyTTLFilter
protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyTTLFilter(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply TTL filtering to the Kafka input dataframe. This method filters out records that are older than the configured TTL threshold. The input dataframe must have the RAW_PUBSUB_INPUT_TABLE_SCHEMA (region, partition, offset, message_type, schema_id, key, value, rmd_version_id, rmd_payload).- Parameters:
dataFrame- Input dataframe with RAW_PUBSUB_INPUT_TABLE_SCHEMA- Returns:
- Filtered dataframe (with stale records removed if TTL filtering is enabled)
-
applyChunkAssembly
protected org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> applyChunkAssembly(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> dataFrame) Apply chunk assembly if chunking is enabled. Groups records by key, sorts by offset DESC, and assembles chunks into complete values/RMDs. If TTL filtering is enabled, the assembler also filters assembled records post-assembly.- Parameters:
dataFrame- Input with SCHEMA_FOR_CHUNK_ASSEMBLY (7 columns)- Returns:
- DataFrame with DEFAULT_SCHEMA_WITH_SCHEMA_ID (5 columns - assembled records)
-
setInputConf
-
getTaskTracker
- Specified by:
getTaskTrackerin classDataWriterComputeJob
-
getJobProperties
-
getPushJobSetting
- Specified by:
getPushJobSettingin classDataWriterComputeJob
-
runComputeJob
public void runComputeJob()- Specified by:
runComputeJobin classDataWriterComputeJob
-
kill
public void kill()- Specified by:
killin interfaceComputeJob- Overrides:
killin classDataWriterComputeJob
-
createPartitionWriterFactory
protected org.apache.spark.api.java.function.MapPartitionsFunction<org.apache.spark.sql.Row,org.apache.spark.sql.Row> createPartitionWriterFactory(org.apache.spark.broadcast.Broadcast<Properties> broadcastProperties, DataWriterAccumulators accumulators) Creates the partition writer factory. Can be overridden for testing purposes.- Parameters:
broadcastProperties- the broadcast job propertiesaccumulators- the data writer accumulators- Returns:
- the partition writer factory
-
close
- Throws:
IOException
-