Package com.linkedin.venice.pushmonitor
Class PushMonitorDelegator
java.lang.Object
com.linkedin.venice.pushmonitor.PushMonitorDelegator
- All Implemented Interfaces:
- PushMonitor
This class maintains the mapping of Kafka topic to each 
PushMonitor instance and delegates calls to the
 correct instance.- 
Constructor SummaryConstructorsConstructorDescriptionPushMonitorDelegator(String clusterName, RoutingDataRepository routingDataRepository, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository metadataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher leaderFollowerTopicReplicator, ClusterLockManager clusterLockManager, String aggregateRealTimeSourceKafkaUrl, List<String> activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats, VeniceWriterFactory veniceWriterFactory, AbstractPushMonitor.CurrentVersionChangeNotifier currentVersionChangeNotifier) 
- 
Method SummaryModifier and TypeMethodDescriptionvoidcleanupStoreStatus(String storeName) Clean up all push statuses related to a store including all error pushes.getIncrementalPushStatusAndDetails(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo) Returns incremental push's status read from (ZooKeeper backed) OfflinePushStatusgetIncrementalPushStatusFromPushStatusStore(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo, PushStatusStoreReader pushStatusStoreReader) Returns incremental push's status read from push status storegetOfflinePushOrThrow(String topic) Return a push's status (push status contains the history statuses and current status) and throw exception if it doesn't existgetOfflinePushStatusForStore(String storeName) getOngoingIncrementalPushVersions(String kafkaTopic) Check if there are any ongoing incremental push for the given version topic.getOngoingIncrementalPushVersions(String kafkaTopic, PushStatusStoreReader pushStatusStoreReader) getPushStatusAndDetails(String topic) getReadyToServeInstances(PartitionAssignment partitionAssignment, int partitionId) Find all ongoing pushes then return the topics associated to those pushes.getUncompletedPartitions(String topic) booleanvoidload previous push statuses when push monitor restartsvoidmarkOfflinePushAsError(String topic, String statusDetails) Mark a push to be as error.voidrecordPushPreparationDuration(String topic, long offlinePushWaitTimeInSecond) stats related operation.refreshAndUpdatePushStatus(String kafkaTopic, ExecutionStatus newStatus, Optional<String> newStatusDetails) Here, we refresh the push status, in order to avoid a race condition where a small job could already be completed.voidstartMonitorOfflinePush(String kafkaTopic, int numberOfPartition, int replicaFactor, OfflinePushStrategy strategy) voidStop monitoring all offline pushes.voidstopMonitorOfflinePush(String kafkaTopic, boolean deletePushStatus, boolean isForcedDelete) Stop monitoring a push.
- 
Constructor Details- 
PushMonitorDelegatorpublic PushMonitorDelegator(String clusterName, RoutingDataRepository routingDataRepository, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository metadataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher leaderFollowerTopicReplicator, ClusterLockManager clusterLockManager, String aggregateRealTimeSourceKafkaUrl, List<String> activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats, VeniceWriterFactory veniceWriterFactory, AbstractPushMonitor.CurrentVersionChangeNotifier currentVersionChangeNotifier) 
 
- 
- 
Method Details- 
loadAllPushespublic void loadAllPushes()Description copied from interface:PushMonitorload previous push statuses when push monitor restarts- Specified by:
- loadAllPushesin interface- PushMonitor
 
- 
startMonitorOfflinePushpublic void startMonitorOfflinePush(String kafkaTopic, int numberOfPartition, int replicaFactor, OfflinePushStrategy strategy) - Specified by:
- startMonitorOfflinePushin interface- PushMonitor
- strategy- on which criteria a push is considered to be completed. check- PushStatusDeciderfor more details Start monitoring a new push
 
- 
stopMonitorOfflinePushpublic void stopMonitorOfflinePush(String kafkaTopic, boolean deletePushStatus, boolean isForcedDelete) Description copied from interface:PushMonitorStop monitoring a push. This function should be called when: 1. Retire a version; 2. Leader controller transits to standby; in this case, controller shouldn't delete any push status.- Specified by:
- stopMonitorOfflinePushin interface- PushMonitor
 
- 
stopAllMonitoringpublic void stopAllMonitoring()Description copied from interface:PushMonitorStop monitoring all offline pushes.- Specified by:
- stopAllMonitoringin interface- PushMonitor
 
- 
cleanupStoreStatusDescription copied from interface:PushMonitorClean up all push statuses related to a store including all error pushes. This is called when a store gets deleted.- Specified by:
- cleanupStoreStatusin interface- PushMonitor
 
- 
getOfflinePushOrThrowDescription copied from interface:PushMonitorReturn a push's status (push status contains the history statuses and current status) and throw exception if it doesn't exist- Specified by:
- getOfflinePushOrThrowin interface- PushMonitor
 
- 
getPushStatusAndDetails- Specified by:
- getPushStatusAndDetailsin interface- PushMonitor
- Returns:
- return the current status. If it's in error, some debugging info is also presented.
 
- 
getUncompletedPartitions- Specified by:
- getUncompletedPartitionsin interface- PushMonitor
 
- 
getIncrementalPushStatusAndDetailspublic ExecutionStatusWithDetails getIncrementalPushStatusAndDetails(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo) Description copied from interface:PushMonitorReturns incremental push's status read from (ZooKeeper backed) OfflinePushStatus- Specified by:
- getIncrementalPushStatusAndDetailsin interface- PushMonitor
 
- 
getIncrementalPushStatusFromPushStatusStorepublic ExecutionStatusWithDetails getIncrementalPushStatusFromPushStatusStore(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo, PushStatusStoreReader pushStatusStoreReader) Description copied from interface:PushMonitorReturns incremental push's status read from push status store- Specified by:
- getIncrementalPushStatusFromPushStatusStorein interface- PushMonitor
 
- 
getOngoingIncrementalPushVersionsDescription copied from interface:PushMonitorCheck if there are any ongoing incremental push for the given version topic.- Specified by:
- getOngoingIncrementalPushVersionsin interface- PushMonitor
- Parameters:
- kafkaTopic- to check for ongoing incremental push
- Returns:
- ongoing incremental push versions if there are any, otherwise an empty set is returned.
 
- 
getOngoingIncrementalPushVersionspublic Set<String> getOngoingIncrementalPushVersions(String kafkaTopic, PushStatusStoreReader pushStatusStoreReader) - Specified by:
- getOngoingIncrementalPushVersionsin interface- PushMonitor
 
- 
getTopicsOfOngoingOfflinePushesDescription copied from interface:PushMonitorFind all ongoing pushes then return the topics associated to those pushes.- Specified by:
- getTopicsOfOngoingOfflinePushesin interface- PushMonitor
 
- 
getOfflinePushStatusForStore- Specified by:
- getOfflinePushStatusForStorein interface- PushMonitor
 
- 
markOfflinePushAsErrorDescription copied from interface:PushMonitorMark a push to be as error. This is usually called when push is killed.- Specified by:
- markOfflinePushAsErrorin interface- PushMonitor
 
- 
refreshAndUpdatePushStatuspublic OfflinePushStatus refreshAndUpdatePushStatus(String kafkaTopic, ExecutionStatus newStatus, Optional<String> newStatusDetails) Description copied from interface:PushMonitorHere, we refresh the push status, in order to avoid a race condition where a small job could already be completed. Previously, we would clobber the COMPLETED status with STARTED, which would stall the job forever. Now, since we get the refreshed status, we can validate whether a transition to is valid, before making the change. If if wouldn't be valid (because the job already completed or already failed, for example), then we leave the status as is, rather than adding in the new details.- Specified by:
- refreshAndUpdatePushStatusin interface- PushMonitor
- Returns:
- the new OfflinePushStatusif it was updated, or null if the update was skipped.
 
- 
recordPushPreparationDurationDescription copied from interface:PushMonitorstats related operation. TODO: we may want to move it out of the interface- Specified by:
- recordPushPreparationDurationin interface- PushMonitor
 
- 
getReadyToServeInstancespublic List<Instance> getReadyToServeInstances(PartitionAssignment partitionAssignment, int partitionId) - Specified by:
- getReadyToServeInstancesin interface- PushMonitor
 
- 
isOfflinePushMonitorDaVinciPushStatusEnabledpublic boolean isOfflinePushMonitorDaVinciPushStatusEnabled()- Specified by:
- isOfflinePushMonitorDaVinciPushStatusEnabledin interface- PushMonitor
 
 
-