Class ActiveActiveStoreIngestionTask
java.lang.Object
com.linkedin.davinci.kafka.consumer.StoreIngestionTask
com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
com.linkedin.davinci.kafka.consumer.ActiveActiveStoreIngestionTask
- All Implemented Interfaces:
Closeable,AutoCloseable,Runnable
This class contains logic that SNs must perform if a store-version is running in Active/Active mode.
-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
StoreIngestionTask.DelegateConsumerRecordResult -
Field Summary
Fields inherited from class com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
aaWCIngestionStorageLookupThreadPool, GLOBAL_RT_DIV_KEY_PREFIX, globalRtDivKeyBytesCache, globalRtDivStateSerializer, hasChangeCaptureView, hasComplexVenicePartitionerMaterializedView, kafkaClusterIdToUrlMap, storeDeserializerCache, veniceWriter, veniceWriterForRealTime, viewWritersFields inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
aggKafkaConsumerService, availableSchemaIds, batchReportIncPushStatusEnabled, bootstrapTimeoutInMs, CHUNK_SCHEMA_ID, chunkAssembler, compressionStrategy, compressor, compressorFactory, consumedBytesSinceLastSync, consumerActionSequenceNumber, consumerActionsQueue, consumerDiv, databaseSyncBytesIntervalForDeferredWriteMode, databaseSyncBytesIntervalForTransactionalMode, dataRecoverySourceVersionNumber, defaultReadyToServeChecker, deserializedSchemaIds, diskUsage, divErrorMetricCallback, emitMetrics, emptyPollSleepMs, errorPartitionId, gracefulShutdownLatch, hostLevelIngestionStats, hostName, hybridStoreConfig, idleCounter, ingestionNotificationDispatcher, ingestionTaskName, isChunked, isCurrentVersion, isDataRecovery, isDaVinciClient, isGlobalRtDivEnabled, isIsolatedIngestion, isRmdChunked, isRunning, isSystemStore, isWriteComputationEnabled, kafkaClusterUrlResolver, kafkaProps, kafkaVersionTopic, KILL_WAIT_TIME_MS, localKafkaClusterId, localKafkaServer, localKafkaServerSingletonSet, manifestSerializer, metaStoreWriter, parallelProcessingThreadPool, partitionConsumptionStateMap, partitionCount, partitionToPendingConsumerActionCountMap, pubSubContext, pubSubTopicRepository, readCycleDelayMs, readOnlyForBatchOnlyStoreEnabled, realTimeTopic, recordLevelMetricEnabled, recordTransformerStats, REDUNDANT_LOGGING_FILTER, resetErrorReplicaEnabled, SCHEMA_POLLING_DELAY_MS, schemaRepository, separateRealTimeTopic, serverConfig, storageEngine, storageMetadataService, storageService, STORE_VERSION_POLLING_DELAY_MS, storeBufferService, storeName, storeRepository, storeVersionName, storeVersionPartitionCount, topicManagerRepository, versionedDIVStats, versionedIngestionStats, versionNumber, versionRole, versionTopic, WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED, workloadType, writeComputeFailureCode, zkHelixAdmin -
Constructor Summary
ConstructorsConstructorDescriptionActiveActiveStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, InternalDaVinciRecordTransformerConfig internalRecordTransformerConfig, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) -
Method Summary
Modifier and TypeMethodDescriptionprotected Map<String,PubSubPosition> calculateRtConsumptionStartPositions(PartitionConsumptionState pcs, PubSubTopic newSourceTopic, List<CharSequence> unreachableBrokerList) Calculates real-time start positions for each PubSub broker during a topic switch in Active-Active replication.voidconsumerSubscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition startPosition, String pubSubAddress) Ensures the PubSub URL is present in the PubSub cluster URL-to-ID map before subscribing to a topic.protected LeaderProducerCallbackcreateProducerCallback(DefaultPubSubMessage consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) delegateConsumerRecord(PubSubMessageProcessedResultWrapper consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed.protected IngestionBatchProcessorprotected PubSubPositiongetLatestConsumedUpstreamPositionForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, String upstreamKafkaUrl) Different from the persisted upstream offset map in OffsetRecord, latest consumed upstream offset map is maintained for each individual Kafka url.protected PubSubPositiongetLatestPersistedRtPositionForLagMeasurement(PartitionConsumptionState pcs, String upstreamKafkaUrl) Returns the latest processed upstream real-time offset for the given region.protected BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> getProduceToTopicFunction(PartitionConsumptionState partitionConsumptionState, byte[] key, ByteBuffer updatedValueBytes, ByteBuffer updatedRmdBytes, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, int valueSchemaId, boolean resultReuseInput) protected booleanisRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) booleanisTransientRecordBufferUsed(PartitionConsumptionState partitionConsumptionState) For Active-Active this buffer is always used, as long as we're post-EOP.protected voidleaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic) protected longmeasureRtLagForMultiRegions(Set<String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState pcs, boolean shouldLogLag) For stores in active/active mode, if no fabric is unreachable, return the maximum lag of all fabrics.protected voidprocessMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) protected voidprocessTopicSwitch(ControlMessage controlMessage, int partition, PubSubPosition position, PartitionConsumptionState partitionConsumptionState) ProcessTopicSwitchcontrol message at given partition position for a specificPartitionConsumptionState.protected voidputInStorageEngine(int partition, byte[] keyBytes, Put put) Persist Put record to storage engine.protected voidremoveFromStorageEngine(int partition, byte[] keyBytes, Delete delete) protected voidstartConsumingAsLeader(PartitionConsumptionState partitionConsumptionState) protected voidupdateLatestConsumedRtPositions(PartitionConsumptionState pcs, String pubSubBrokerAddress, PubSubPosition pubSubPosition) protected voidupdateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord.Methods inherited from class com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
addPartitionConsumptionState, calculateAssembledRecordSizeRatio, checkAndHandleUpstreamOffsetRewind, checkAndLogIfLagIsAcceptableForHybridStore, checkLongRunningTaskState, checkWhetherToCloseUnusedVeniceWriter, closeVeniceViewWriters, closeVeniceWriters, constructVeniceWriter, consumerUnSubscribeAllTopics, demoteToStandby, getAndUpdateLeaderCompletedState, getConsumptionSourceKafkaAddress, getDataRecoverySourcePubSub, getGlobalRtDivKeyBytes, getGlobalRtDivKeyName, getMaxNearlineRecordSizeBytes, getMaxRecordSizeBytes, getRealTimeDataSourceKafkaAddress, getRewindStartPositionForRealTimeTopic, getVeniceWriter, getWriteComputeErrorCode, hasViewWriters, isHybridFollower, isLeader, maybeCompressData, maybeSendIngestionHeartbeat, measureHybridHeartbeatLag, measureHybridHeartbeatTimestamp, measureHybridOffsetLag, measureRtLagForSingleRegion, processConsumerAction, processControlMessageForViews, produceToLocalKafka, promoteToLeader, queueUpVersionTopicWritesWithViewWriters, recordAssembledRecordSizeRatio, recordHeartbeatReceived, recordProcessedRecordStats, recordWriterStats, refreshIngestionContextIfChanged, reportIfCatchUpVersionTopicOffset, resubscribe, resubscribeAsFollower, resubscribeAsLeader, setNativeReplicationSourceVersionTopicKafkaURL, shouldCompressData, shouldNewLeaderSwitchToRemoteConsumption, shouldPersistRecord, shouldProcessRecord, shouldProduceToVersionTopic, syncConsumedUpstreamRTOffsetMapIfNeeded, syncTopicSwitchToIngestionMetadataService, updateLeaderTopicOnFollower, updateOffsetMetadataInOffsetRecord, updateOffsetsFromConsumerRecord, validateAndFilterOutDuplicateMessagesFromLeaderTopic, waitForAllMessageToBeProcessedFromTopicPartition, waitForLastLeaderPersistFutureMethods inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
checkIngestionProgress, cloneDrainerDivProducerStates, close, consumerBatchUnsubscribe, consumerHasAnySubscription, consumerHasSubscription, consumerResetOffset, consumerSubscribe, createKafkaConsumerProperties, disableMetricsEmission, dropStoragePartitionGracefully, dumpPartitionConsumptionStates, dumpStoreVersionState, enableMetricsEmission, extractUpstreamClusterId, extractUpstreamPosition, getCompressionStrategy, getCompressor, getDataIntegrityValidator, getFailedIngestionPartitionCount, getHostLevelIngestionStats, getIngestionTaskName, getKafkaVersionTopic, getOffsetToOnlineLagThresholdPerPartition, getPartitionConsumptionState, getSchemaRepo, getServerConfig, getStorageEngine, getStoragePartitionConfig, getStoragePartitionConfig, getStorageUtilizationManager, getStoreName, getTopicManager, getTopicPartitionEndPosition, getVersionedDIVStats, getVersionIngestionStats, getVersionNumber, getVersionTopic, hasAllPartitionReportedCompleted, hasAnyPartitionConsumptionState, hasAnyPendingSubscription, hasAnySubscription, hasPendingPartitionIngestionAction, hasReplicas, isActiveActiveReplicationEnabled, isChunked, isCurrentVersion, isHybridMode, isIdleOverThreshold, isIngestionTaskActive, isMetricsEmissionEnabled, isPartitionConsumingOrHasPendingIngestionAction, isProducingVersionTopicHealthy, isReadyToServe, isRunning, isSeparatedRealtimeTopicEnabled, isUserSystemStore, kill, logStorageOperationWhileUnsubscribed, measureLagWithCallToPubSub, measureLagWithCallToPubSub, minZeroLag, nextSeqNum, processCommonConsumerAction, processConsumerRecord, processEndOfIncrementalPush, processEndOfPush, processStartOfIncrementalPush, produceToStoreBufferService, produceToStoreBufferServiceOrKafka, produceToStoreBufferServiceOrKafkaInBatch, recordAssembledRecordSize, recordChecksumVerificationFailure, reportError, resetPartitionConsumptionOffset, resolveSourceKafkaServersWithinTopicSwitch, run, setLastConsumerException, setLastStoreIngestionException, setPartitionConsumptionState, shutdownAndWait, subscribePartition, subscribePartition, throwIfNotRunning, throwOrLogStorageFailureDependingIfStillSubscribed, unsubscribeFromTopic, unSubscribePartition, unSubscribePartition, updateAndSyncOffsetFromSnapshot, updateOffsetMetadataAndSync, updateOffsetMetadataAndSyncOffset, updateOffsetMetadataAndSyncOffset, validateEndOfPushReceivedBeforeTopicSwitch, validateMessage, waitVersionStateAvailable
-
Constructor Details
-
ActiveActiveStoreIngestionTask
public ActiveActiveStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, InternalDaVinciRecordTransformerConfig internalRecordTransformerConfig, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Details
-
delegateConsumerRecord
protected StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) Description copied from class:LeaderFollowerStoreIngestionTaskThe goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed. It's decided based on the function output ofLeaderFollowerStoreIngestionTask.shouldProduceToVersionTopic(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState)and message type. It also perform any necessary additional computation operation such as for write-compute/update message. It returns a boolean indicating if it was produced to kafka or not. This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic. The caller of this function should only process this further if the return isStoreIngestionTask.DelegateConsumerRecordResult.QUEUED_TO_DRAINER. This function assumesLeaderFollowerStoreIngestionTask.shouldProcessRecord(DefaultPubSubMessage)has been called which happens inStoreIngestionTask.produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)before calling this and the it was decided that this record needs to be processed. It does not perform any validation check on the PartitionConsumptionState object to keep the goal of the function simple and not overload. Also DIV validation is done here if the message is received from RT topic. For more info please see please see StoreIngestionTask#internalProcessConsumerRecord(PubSubMessage, PartitionConsumptionState, LeaderProducedRecordContext, int, String, long) This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this function.- Overrides:
delegateConsumerRecordin classLeaderFollowerStoreIngestionTask- Returns:
- a
StoreIngestionTask.DelegateConsumerRecordResultindicating what to do with the record
-
putInStorageEngine
Description copied from class:StoreIngestionTaskPersist Put record to storage engine.- Overrides:
putInStorageEnginein classStoreIngestionTask
-
removeFromStorageEngine
- Overrides:
removeFromStorageEnginein classStoreIngestionTask
-
getRmdSerDe
-
getIngestionBatchProcessor
- Overrides:
getIngestionBatchProcessorin classLeaderFollowerStoreIngestionTask
-
processMessageAndMaybeProduceToKafka
protected void processMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) - Overrides:
processMessageAndMaybeProduceToKafkain classLeaderFollowerStoreIngestionTask
-
calculateRtConsumptionStartPositions
protected Map<String,PubSubPosition> calculateRtConsumptionStartPositions(PartitionConsumptionState pcs, PubSubTopic newSourceTopic, List<CharSequence> unreachableBrokerList) Calculates real-time start positions for each PubSub broker during a topic switch in Active-Active replication.For each broker, determines the appropriate starting position by either using the latest processed position or rewinding to a specific timestamp. Handles broker failures gracefully by adding unreachable brokers to a repair queue and requires a quorum of reachable brokers to proceed.
- Overrides:
calculateRtConsumptionStartPositionsin classLeaderFollowerStoreIngestionTask- Parameters:
pcs- the partition consumption state containing topic switch informationnewSourceTopic- the new real-time topic to switch tounreachableBrokerList- output list populated with brokers that couldn't be contacted- Returns:
- map of PubSub broker addresses to their calculated start positions
- Throws:
VeniceException- if no topic switch is configured or insufficient brokers are reachable
-
startConsumingAsLeader
- Overrides:
startConsumingAsLeaderin classLeaderFollowerStoreIngestionTask
-
consumerSubscribe
public void consumerSubscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition startPosition, String pubSubAddress) Ensures the PubSub URL is present in the PubSub cluster URL-to-ID map before subscribing to a topic. Prevents subscription to unknown PubSub URLs, which can cause issues during message consumption. -
leaderExecuteTopicSwitch
protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic) - Overrides:
leaderExecuteTopicSwitchin classLeaderFollowerStoreIngestionTask
-
processTopicSwitch
protected void processTopicSwitch(ControlMessage controlMessage, int partition, PubSubPosition position, PartitionConsumptionState partitionConsumptionState) ProcessTopicSwitchcontrol message at given partition position for a specificPartitionConsumptionState.- Overrides:
processTopicSwitchin classLeaderFollowerStoreIngestionTask
-
updateLatestInMemoryProcessedOffset
protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Description copied from class:StoreIngestionTaskMaintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.- Overrides:
updateLatestInMemoryProcessedOffsetin classLeaderFollowerStoreIngestionTask
-
isRealTimeBufferReplayStarted
protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) - Overrides:
isRealTimeBufferReplayStartedin classLeaderFollowerStoreIngestionTask
-
getLatestPersistedRtPositionForLagMeasurement
protected PubSubPosition getLatestPersistedRtPositionForLagMeasurement(PartitionConsumptionState pcs, String upstreamKafkaUrl) Returns the latest processed upstream real-time offset for the given region. This is used to compute hybrid offset lag on a per-region basis, which is then used in conjunction with lag from other regions to determine ready-to-serve status.- Overrides:
getLatestPersistedRtPositionForLagMeasurementin classLeaderFollowerStoreIngestionTask
-
getLatestConsumedUpstreamPositionForHybridOffsetLagMeasurement
protected PubSubPosition getLatestConsumedUpstreamPositionForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, String upstreamKafkaUrl) Different from the persisted upstream offset map in OffsetRecord, latest consumed upstream offset map is maintained for each individual Kafka url. -
updateLatestConsumedRtPositions
protected void updateLatestConsumedRtPositions(PartitionConsumptionState pcs, String pubSubBrokerAddress, PubSubPosition pubSubPosition) - Overrides:
updateLatestConsumedRtPositionsin classLeaderFollowerStoreIngestionTask
-
isTransientRecordBufferUsed
For Active-Active this buffer is always used, as long as we're post-EOP.- Overrides:
isTransientRecordBufferUsedin classStoreIngestionTask- Returns:
-
measureRtLagForMultiRegions
protected long measureRtLagForMultiRegions(Set<String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState pcs, boolean shouldLogLag) For stores in active/active mode, if no fabric is unreachable, return the maximum lag of all fabrics. If only one fabric is unreachable, return the maximum lag of other fabrics. If more than one fabrics are unreachable, return Long.MAX_VALUE, which means the partition is not ready-to-serve. TODO: For active/active incremental push stores or stores with only one samza job, we should consider the weight of unreachable fabric and make the decision. For example, we should not let partition ready-to-serve when the only source fabric is unreachable.- Overrides:
measureRtLagForMultiRegionsin classLeaderFollowerStoreIngestionTask- Parameters:
sourceRealTimeTopicKafkaURLs-pcs-shouldLogLag-- Returns:
-
getProduceToTopicFunction
protected BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> getProduceToTopicFunction(PartitionConsumptionState partitionConsumptionState, byte[] key, ByteBuffer updatedValueBytes, ByteBuffer updatedRmdBytes, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, int valueSchemaId, boolean resultReuseInput) -
createProducerCallback
protected LeaderProducerCallback createProducerCallback(DefaultPubSubMessage consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) - Overrides:
createProducerCallbackin classLeaderFollowerStoreIngestionTask
-