Class VenicePushJob

  • All Implemented Interfaces:
    java.lang.AutoCloseable

    public class VenicePushJob
    extends java.lang.Object
    implements java.lang.AutoCloseable
    This class sets up the Hadoop job used to push data to Venice. The job reads the input data off HDFS. It supports 2 kinds of input -- Avro / Binary Json (Vson).
    • Field Detail

      • validateSchemaAndBuildDictJobConf

        protected final org.apache.hadoop.mapred.JobConf validateSchemaAndBuildDictJobConf
    • Constructor Detail

      • VenicePushJob

        public VenicePushJob​(java.lang.String jobId,
                             java.util.Properties vanillaProps)
        Parameters:
        jobId - id of the job
        vanillaProps - Property bag for the job
    • Method Detail

      • setControllerClient

        protected void setControllerClient​(ControllerClient controllerClient)
      • setJobClientWrapper

        public void setJobClientWrapper​(JobClientWrapper jobClientWrapper)
      • setInputDataInfoProvider

        protected void setInputDataInfoProvider​(InputDataInfoProvider inputDataInfoProvider)
      • setVeniceWriter

        protected void setVeniceWriter​(VeniceWriter<KafkaKey,​byte[],​byte[]> veniceWriter)
      • setSentPushJobDetailsTracker

        public void setSentPushJobDetailsTracker​(SentPushJobDetailsTracker sentPushJobDetailsTracker)
      • getValidateSchemaAndBuildDictionaryOutputDir

        protected static java.lang.String getValidateSchemaAndBuildDictionaryOutputDir​(java.lang.String parentOutputDir,
                                                                                       java.lang.String storeName,
                                                                                       java.lang.String jobExecId)
        Creating the output file for ValidateSchemaAndBuildDictMapper to persist data to be read from VPJ driver. Output Directory: {$hadoopTmpDir}/{$storeName}-{$JOB_EXEC_ID}-{$randomUniqueString} File name: mapper-output-{$MRJobID}.avro Why JOB_EXEC_ID and randomUniqueString: This gives uniqueness to the name of the directory whose permission will be restricted to the current user who started the VPJ only. This helps with 2 issues. 1. There could be instances where multiple headless accounts are writing to a single Venice store. It shouldn't happen in regular cases - but is very likely in case of migrations (technical or organizational) => unless we have unique directory for each job, the multiple accounts will have access issues of the directory. 2. Multiple push jobs can be started in parallel, but only 1 will continue beyond ControllerClient.requestTopicForWrites(java.lang.String, long, com.linkedin.venice.meta.Version.PushType, java.lang.String, boolean, boolean, boolean, java.util.Optional<java.lang.String>, java.util.Optional<java.lang.String>, java.util.Optional<java.lang.String>, boolean, long) as this method will throw CONCURRENT_BATCH_PUSH error if there is another push job in progress. As ValidateSchemaAndBuildDictMapper runs before this method, it is prone to concurrent push jobs and thus race conditions. Having unique directories per execution will help here. Why can't use MRJobID to achieve randomness: MR's jobID gets populated only after FileOutputFormat.checkOutputSpecs(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.mapred.JobConf), which needs FileOutputFormat.setOutputPath(org.apache.hadoop.mapred.JobConf, org.apache.hadoop.fs.Path) to be set already. so currently unable to use the ID. TODO: should try exploring using conf.get("hadoop.tmp.dir") or similar configs to get default tmp directory in different HDFS environments rather than hardcoding it to VALIDATE_SCHEMA_AND_BUILD_DICTIONARY_MAPPER_OUTPUT_PARENT_DIR_DEFAULT.
      • getValidateSchemaAndBuildDictionaryOutputFileNameNoExtension

        protected static java.lang.String getValidateSchemaAndBuildDictionaryOutputFileNameNoExtension​(java.lang.String mrJobId)
      • getValidateSchemaAndBuildDictionaryOutputFileName

        protected static java.lang.String getValidateSchemaAndBuildDictionaryOutputFileName​(java.lang.String mrJobId)
      • shouldBuildZstdCompressionDictionary

        protected static boolean shouldBuildZstdCompressionDictionary​(PushJobSetting pushJobSetting,
                                                                      boolean inputFileHasRecords)
        This functions decides whether Zstd compression dictionary needs to be trained or not, based on the type of push, configs and whether there are any input records or not, or whether PushJobSetting.compressionMetricCollectionEnabled is enabled or not.
      • evaluateCompressionMetricCollectionEnabled

        protected static boolean evaluateCompressionMetricCollectionEnabled​(PushJobSetting pushJobSetting,
                                                                            boolean inputFileHasRecords)
        This functions evaluates the config PushJobSetting.compressionMetricCollectionEnabled based on the input data and other configs as an initial filter to disable this config for cases where we won't be able to collect this information or where it doesn't make sense to collect this information. eg: When there are no data or for Incremental push.
      • getValidateSchemaAndBuildDictMapperOutputReader

        protected ValidateSchemaAndBuildDictMapperOutputReader getValidateSchemaAndBuildDictMapperOutputReader​(java.lang.String outputDir,
                                                                                                               java.lang.String fileName)
                                                                                                        throws java.lang.Exception
        Throws:
        java.lang.Exception
      • initKIFRepushDetails

        protected void initKIFRepushDetails()
      • getInputURI

        protected java.lang.String getInputURI​(VeniceProperties props)
        Get input path from the properties; Check whether there is sub-directory in the input directory
        Parameters:
        props -
        Returns:
        input URI
        Throws:
        java.lang.Exception
      • validateRemoteHybridSettings

        protected void validateRemoteHybridSettings()
      • validateRemoteHybridSettings

        protected void validateRemoteHybridSettings​(PushJobSetting setting)
      • setupInputFormatConfToValidateSchemaAndBuildDict

        protected void setupInputFormatConfToValidateSchemaAndBuildDict​(org.apache.hadoop.mapred.JobConf conf,
                                                                        PushJobSetting pushJobSetting,
                                                                        java.lang.String inputDirectory)
      • cancel

        public void cancel()
        A cancel method for graceful cancellation of the running Job to be invoked as a result of user actions.
        Throws:
        java.lang.Exception
      • getKafkaUrl

        public java.lang.String getKafkaUrl()
      • getIncrementalPushVersion

        public java.lang.String getIncrementalPushVersion()
      • getTopicToMonitor

        public java.lang.String getTopicToMonitor()
      • close

        public void close()
        Specified by:
        close in interface java.lang.AutoCloseable
      • main

        public static void main​(java.lang.String[] args)