Package com.linkedin.venice.pushmonitor
Class AbstractPushMonitor
java.lang.Object
com.linkedin.venice.pushmonitor.AbstractPushMonitor
- All Implemented Interfaces:
RoutingDataRepository.RoutingDataChangedListener,PartitionStatusListener,PushMonitor
- Direct Known Subclasses:
PartitionStatusBasedPushMonitor
public abstract class AbstractPushMonitor
extends Object
implements PushMonitor, PartitionStatusListener, RoutingDataRepository.RoutingDataChangedListener
AbstractPushMonitor is a high level abstraction that manages
OfflinePushStatus.
Depending on the implementation, it collects info from different paths and updates push
status accordingly.
At present, push status has 1 initial state ExecutionStatus.STARTED and 2 end states
ExecutionStatus.COMPLETED and ExecutionStatus.ERROR.
State mutation is unidirectional and once it reaches to either end state, we stop mutating it.
Check OfflinePushStatus for more details.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interface -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionAbstractPushMonitor(String clusterName, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository metadataRepository, RoutingDataRepository routingDataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher realTimeTopicSwitcher, ClusterLockManager clusterLockManager, String aggregateRealTimeSourceKafkaUrl, List<String> activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats, VeniceWriterFactory veniceWriterFactory, AbstractPushMonitor.CurrentVersionChangeNotifier currentVersionChangeNotifier) -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract ExecutionStatusWithDetailscheckPushStatus(OfflinePushStatus pushStatus, PartitionAssignment partitionAssignment, DisableReplicaCallback callback) protected voidcheckWhetherToStartEOPProcedures(OfflinePushStatus offlinePushStatus) voidcleanupStoreStatus(String storeName) Clean up all push statuses related to a store including all error pushes.protected DisableReplicaCallbackgetDisableReplicaCallback(String kafkaTopic) protected longgetDurationInSec(OfflinePushStatus pushStatus) getIncrementalPushStatusAndDetails(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo) Returns incremental push's status read from (ZooKeeper backed) OfflinePushStatusgetIncrementalPushStatusFromPushStatusStore(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo, PushStatusStoreReader pushStatusStoreReader) Returns incremental push's status read from push status storeprotected OfflinePushStatusgetOfflinePush(String topic) protected OfflinePushAccessorgetOfflinePushOrThrow(String topic) Return a push's status (push status contains the history statuses and current status) and throw exception if it doesn't existgetOfflinePushStatusForStore(String storeName) getOngoingIncrementalPushVersions(String kafkaTopic) Check if there are any ongoing incremental push for the given version topic.getOngoingIncrementalPushVersions(String kafkaTopic, PushStatusStoreReader pushStatusStoreReader) Get ongoing incremental push versions from the push status storegetPushStatus(String topic) getPushStatusAndDetails(String topic) protected ReadWriteStoreRepositorygetReadyToServeInstances(PartitionAssignment partitionAssignment, int partitionId) protected RoutingDataRepositoryFind all ongoing pushes then return the topics associated to those pushes.getUncompletedPartitions(String topic) protected voidhandleCompletedPush(String topic) protected voidhandleErrorPush(String topic, ExecutionStatusWithDetails executionStatusWithDetails) protected voidhandleTerminalOfflinePushUpdate(OfflinePushStatus pushStatus, ExecutionStatusWithDetails statusWithDetails) This method will unsubscribe external view changes and is intended to be called when the statues are terminable.booleanvoidload previous push statuses when push monitor restartsvoidmarkOfflinePushAsError(String topic, String statusDetails) Mark a push to be as error.voidonCustomizedViewAdded(PartitionAssignment partitionAssignment) voidonCustomizedViewChange(PartitionAssignment partitionAssignment) voidonExternalViewChange(PartitionAssignment partitionAssignment) Handle routing data changed event.protected voidonPartitionStatusChange(OfflinePushStatus offlinePushStatus) voidonPartitionStatusChange(String topic, ReadOnlyPartitionStatus partitionStatus) voidonRoutingDataDeleted(String kafkaTopic) voidrecordPushPreparationDuration(String topic, long offlinePushWaitTimeInSecond) stats related operation.refreshAndUpdatePushStatus(String kafkaTopic, ExecutionStatus newStatus, Optional<String> newStatusDetails) Here, we refresh the push status, in order to avoid a race condition where a small job could already be completed.protected voidretireOldErrorPushes(String storeName) voidsetRealTimeTopicSwitcher(RealTimeTopicSwitcher realTimeTopicSwitcher) For testing only; in order to override the topicReplicator with mocked Replicator.voidstartMonitorOfflinePush(String kafkaTopic, int numberOfPartition, int replicaFactor, OfflinePushStrategy strategy) voidStop monitoring all offline pushes.voidstopMonitorOfflinePush(String kafkaTopic, boolean deletePushStatus, boolean isForcedDelete) Stop monitoring a push.protected voidupdateOfflinePush(String topic) protected OfflinePushStatusupdatePushStatus(OfflinePushStatus expectedCurrPushStatus, ExecutionStatus newExecutionStatus, Optional<String> newExecutionStatusDetails) Direct calls to updatePushStatus should be made carefully.
-
Field Details
-
MAX_PUSH_TO_KEEP
public static final int MAX_PUSH_TO_KEEP- See Also:
-
-
Constructor Details
-
AbstractPushMonitor
public AbstractPushMonitor(String clusterName, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository metadataRepository, RoutingDataRepository routingDataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher realTimeTopicSwitcher, ClusterLockManager clusterLockManager, String aggregateRealTimeSourceKafkaUrl, List<String> activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats, VeniceWriterFactory veniceWriterFactory, AbstractPushMonitor.CurrentVersionChangeNotifier currentVersionChangeNotifier)
-
-
Method Details
-
loadAllPushes
public void loadAllPushes()Description copied from interface:PushMonitorload previous push statuses when push monitor restarts- Specified by:
loadAllPushesin interfacePushMonitor
-
startMonitorOfflinePush
public void startMonitorOfflinePush(String kafkaTopic, int numberOfPartition, int replicaFactor, OfflinePushStrategy strategy) - Specified by:
startMonitorOfflinePushin interfacePushMonitorstrategy- on which criteria a push is considered to be completed. checkPushStatusDeciderfor more details Start monitoring a new push
-
stopMonitorOfflinePush
public void stopMonitorOfflinePush(String kafkaTopic, boolean deletePushStatus, boolean isForcedDelete) Description copied from interface:PushMonitorStop monitoring a push. This function should be called when: 1. Retire a version; 2. Leader controller transits to standby; in this case, controller shouldn't delete any push status.- Specified by:
stopMonitorOfflinePushin interfacePushMonitor
-
stopAllMonitoring
public void stopAllMonitoring()Description copied from interface:PushMonitorStop monitoring all offline pushes.- Specified by:
stopAllMonitoringin interfacePushMonitor
-
cleanupStoreStatus
Description copied from interface:PushMonitorClean up all push statuses related to a store including all error pushes. This is called when a store gets deleted.- Specified by:
cleanupStoreStatusin interfacePushMonitor
-
getOfflinePushOrThrow
Description copied from interface:PushMonitorReturn a push's status (push status contains the history statuses and current status) and throw exception if it doesn't exist- Specified by:
getOfflinePushOrThrowin interfacePushMonitor
-
getOfflinePush
-
updateOfflinePush
-
getPushStatus
-
getIncrementalPushStatusAndDetails
public ExecutionStatusWithDetails getIncrementalPushStatusAndDetails(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo) Description copied from interface:PushMonitorReturns incremental push's status read from (ZooKeeper backed) OfflinePushStatus- Specified by:
getIncrementalPushStatusAndDetailsin interfacePushMonitor
-
getIncrementalPushStatusFromPushStatusStore
public ExecutionStatusWithDetails getIncrementalPushStatusFromPushStatusStore(String kafkaTopic, String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo, PushStatusStoreReader pushStatusStoreReader) Description copied from interface:PushMonitorReturns incremental push's status read from push status store- Specified by:
getIncrementalPushStatusFromPushStatusStorein interfacePushMonitor
-
getOngoingIncrementalPushVersions
Description copied from interface:PushMonitorCheck if there are any ongoing incremental push for the given version topic.- Specified by:
getOngoingIncrementalPushVersionsin interfacePushMonitor- Parameters:
kafkaTopic- to check for ongoing incremental push- Returns:
- ongoing incremental push versions if there are any, otherwise an empty set is returned.
-
getOngoingIncrementalPushVersions
public Set<String> getOngoingIncrementalPushVersions(String kafkaTopic, PushStatusStoreReader pushStatusStoreReader) Get ongoing incremental push versions from the push status store- Specified by:
getOngoingIncrementalPushVersionsin interfacePushMonitor- Parameters:
kafkaTopic- kafka topic belonging to a store version for which we are fetching ongoing inc-pushe versionspushStatusStoreReader- - push status system store reader- Returns:
- set of (supposedly) ongoing incremental pushes
-
getPushStatusAndDetails
- Specified by:
getPushStatusAndDetailsin interfacePushMonitor- Returns:
- return the current status. If it's in error, some debugging info is also presented.
-
getTopicsOfOngoingOfflinePushes
Description copied from interface:PushMonitorFind all ongoing pushes then return the topics associated to those pushes.- Specified by:
getTopicsOfOngoingOfflinePushesin interfacePushMonitor
-
getOfflinePushStatusForStore
- Specified by:
getOfflinePushStatusForStorein interfacePushMonitor
-
getUncompletedPartitions
- Specified by:
getUncompletedPartitionsin interfacePushMonitor
-
markOfflinePushAsError
Description copied from interface:PushMonitorMark a push to be as error. This is usually called when push is killed.- Specified by:
markOfflinePushAsErrorin interfacePushMonitor
-
retireOldErrorPushes
-
checkPushStatus
protected abstract ExecutionStatusWithDetails checkPushStatus(OfflinePushStatus pushStatus, PartitionAssignment partitionAssignment, DisableReplicaCallback callback) -
getReadyToServeInstances
public abstract List<Instance> getReadyToServeInstances(PartitionAssignment partitionAssignment, int partitionId) - Specified by:
getReadyToServeInstancesin interfacePushMonitor
-
refreshAndUpdatePushStatus
public OfflinePushStatus refreshAndUpdatePushStatus(String kafkaTopic, ExecutionStatus newStatus, Optional<String> newStatusDetails) Description copied from interface:PushMonitorHere, we refresh the push status, in order to avoid a race condition where a small job could already be completed. Previously, we would clobber the COMPLETED status with STARTED, which would stall the job forever. Now, since we get the refreshed status, we can validate whether a transition to is valid, before making the change. If if wouldn't be valid (because the job already completed or already failed, for example), then we leave the status as is, rather than adding in the new details.- Specified by:
refreshAndUpdatePushStatusin interfacePushMonitor- Returns:
- the new
OfflinePushStatusif it was updated, or null if the update was skipped.
-
updatePushStatus
protected OfflinePushStatus updatePushStatus(OfflinePushStatus expectedCurrPushStatus, ExecutionStatus newExecutionStatus, Optional<String> newExecutionStatusDetails) Direct calls to updatePushStatus should be made carefully. e.g. calling withExecutionStatus.ERROR or other terminal status update should be made through handleOfflinePushUpdate. That method will then invoke handleErrorPush and perform relevant operations to handle the ERROR status update properly.- Returns:
- the new
OfflinePushStatusif it was updated, or null if the update was skipped.
-
getDurationInSec
-
getOfflinePushAccessor
-
getReadWriteStoreRepository
-
getRoutingDataRepository
-
onPartitionStatusChange
- Specified by:
onPartitionStatusChangein interfacePartitionStatusListener- Specified by:
onPartitionStatusChangein interfaceRoutingDataRepository.RoutingDataChangedListener
-
onPartitionStatusChange
-
getDisableReplicaCallback
-
onExternalViewChange
Description copied from interface:RoutingDataRepository.RoutingDataChangedListenerHandle routing data changed event.- Specified by:
onExternalViewChangein interfaceRoutingDataRepository.RoutingDataChangedListener- Parameters:
partitionAssignment- Newest partitions assignments information including resource name and all of instances assigned to this resource. If the number of partition is 0, it means the kafka topic is deleted.
-
onCustomizedViewChange
- Specified by:
onCustomizedViewChangein interfaceRoutingDataRepository.RoutingDataChangedListener
-
onCustomizedViewAdded
- Specified by:
onCustomizedViewAddedin interfaceRoutingDataRepository.RoutingDataChangedListener
-
onRoutingDataDeleted
- Specified by:
onRoutingDataDeletedin interfaceRoutingDataRepository.RoutingDataChangedListener
-
checkWhetherToStartEOPProcedures
-
handleTerminalOfflinePushUpdate
protected void handleTerminalOfflinePushUpdate(OfflinePushStatus pushStatus, ExecutionStatusWithDetails statusWithDetails) This method will unsubscribe external view changes and is intended to be called when the statues are terminable. -
handleCompletedPush
-
handleErrorPush
-
recordPushPreparationDuration
Description copied from interface:PushMonitorstats related operation. TODO: we may want to move it out of the interface- Specified by:
recordPushPreparationDurationin interfacePushMonitor
-
setRealTimeTopicSwitcher
For testing only; in order to override the topicReplicator with mocked Replicator. -
getRealTimeTopicSwitcher
-
isOfflinePushMonitorDaVinciPushStatusEnabled
public boolean isOfflinePushMonitorDaVinciPushStatusEnabled()- Specified by:
isOfflinePushMonitorDaVinciPushStatusEnabledin interfacePushMonitor
-