Package com.linkedin.venice.pushmonitor
Class AbstractPushMonitor
- java.lang.Object
-
- com.linkedin.venice.pushmonitor.AbstractPushMonitor
-
- All Implemented Interfaces:
RoutingDataRepository.RoutingDataChangedListener
,PartitionStatusListener
,PushMonitor
- Direct Known Subclasses:
PartitionStatusBasedPushMonitor
public abstract class AbstractPushMonitor extends java.lang.Object implements PushMonitor, PartitionStatusListener, RoutingDataRepository.RoutingDataChangedListener
AbstractPushMonitor is a high level abstraction that managesOfflinePushStatus
. Depending on the implementation, it collects info from different paths and updates push status accordingly. At present, push status has 1 initial stateExecutionStatus.STARTED
and 2 end statesExecutionStatus.COMPLETED
andExecutionStatus.ERROR
. State mutation is unidirectional and once it reaches to either end state, we stop mutating it. CheckOfflinePushStatus
for more details.
-
-
Field Summary
Fields Modifier and Type Field Description static int
MAX_PUSH_TO_KEEP
-
Constructor Summary
Constructors Constructor Description AbstractPushMonitor(java.lang.String clusterName, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository metadataRepository, RoutingDataRepository routingDataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher realTimeTopicSwitcher, ClusterLockManager clusterLockManager, java.lang.String aggregateRealTimeSourceKafkaUrl, java.util.List<java.lang.String> activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract ExecutionStatusWithDetails
checkPushStatus(OfflinePushStatus pushStatus, PartitionAssignment partitionAssignment, DisableReplicaCallback callback)
protected void
checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlinePushStatus)
void
cleanupStoreStatus(java.lang.String storeName)
Clean up all push statuses related to a store including all error pushes.protected DisableReplicaCallback
getDisableReplicaCallback(java.lang.String kafkaTopic)
protected long
getDurationInSec(OfflinePushStatus pushStatus)
ExecutionStatusWithDetails
getIncrementalPushStatusAndDetails(java.lang.String kafkaTopic, java.lang.String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo)
Returns incremental push's status read from (ZooKeeper backed) OfflinePushStatusExecutionStatusWithDetails
getIncrementalPushStatusFromPushStatusStore(java.lang.String kafkaTopic, java.lang.String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo, PushStatusStoreReader pushStatusStoreReader)
Returns incremental push's status read from push status storeprotected OfflinePushStatus
getOfflinePush(java.lang.String topic)
protected OfflinePushAccessor
getOfflinePushAccessor()
OfflinePushStatus
getOfflinePushOrThrow(java.lang.String topic)
Return a push's status (push status contains the history statuses and current status) and throw exception if it doesn't existjava.util.List<OfflinePushStatus>
getOfflinePushStatusForStore(java.lang.String storeName)
java.util.Set<java.lang.String>
getOngoingIncrementalPushVersions(java.lang.String kafkaTopic)
Check if there are any ongoing incremental push for the given version topic.java.util.Set<java.lang.String>
getOngoingIncrementalPushVersions(java.lang.String kafkaTopic, PushStatusStoreReader pushStatusStoreReader)
Get ongoing incremental push versions from the push status storeExecutionStatus
getPushStatus(java.lang.String topic)
ExecutionStatusWithDetails
getPushStatusAndDetails(java.lang.String topic)
protected ReadWriteStoreRepository
getReadWriteStoreRepository()
abstract java.util.List<Instance>
getReadyToServeInstances(PartitionAssignment partitionAssignment, int partitionId)
RealTimeTopicSwitcher
getRealTimeTopicSwitcher()
protected RoutingDataRepository
getRoutingDataRepository()
java.util.List<java.lang.String>
getTopicsOfOngoingOfflinePushes()
Find all ongoing pushes then return the topics associated to those pushes.java.util.List<UncompletedPartition>
getUncompletedPartitions(java.lang.String topic)
protected void
handleCompletedPush(java.lang.String topic)
protected void
handleErrorPush(java.lang.String topic, ExecutionStatusWithDetails executionStatusWithDetails)
protected void
handleTerminalOfflinePushUpdate(OfflinePushStatus pushStatus, ExecutionStatusWithDetails statusWithDetails)
This method will unsubscribe external view changes and is intended to be called when the statues are terminable.boolean
isOfflinePushMonitorDaVinciPushStatusEnabled()
void
loadAllPushes()
load previous push statuses when push monitor restartsvoid
markOfflinePushAsError(java.lang.String topic, java.lang.String statusDetails)
Mark a push to be as error.void
onCustomizedViewAdded(PartitionAssignment partitionAssignment)
void
onCustomizedViewChange(PartitionAssignment partitionAssignment)
void
onExternalViewChange(PartitionAssignment partitionAssignment)
Handle routing data changed event.protected void
onPartitionStatusChange(OfflinePushStatus offlinePushStatus)
void
onPartitionStatusChange(java.lang.String topic, ReadOnlyPartitionStatus partitionStatus)
void
onRoutingDataDeleted(java.lang.String kafkaTopic)
void
recordPushPreparationDuration(java.lang.String topic, long offlinePushWaitTimeInSecond)
stats related operation.OfflinePushStatus
refreshAndUpdatePushStatus(java.lang.String kafkaTopic, ExecutionStatus newStatus, java.util.Optional<java.lang.String> newStatusDetails)
Here, we refresh the push status, in order to avoid a race condition where a small job could already be completed.protected void
retireOldErrorPushes(java.lang.String storeName)
void
setRealTimeTopicSwitcher(RealTimeTopicSwitcher realTimeTopicSwitcher)
For testing only; in order to override the topicReplicator with mocked Replicator.void
startMonitorOfflinePush(java.lang.String kafkaTopic, int numberOfPartition, int replicaFactor, OfflinePushStrategy strategy)
void
stopAllMonitoring()
Stop monitoring all offline pushes.void
stopMonitorOfflinePush(java.lang.String kafkaTopic, boolean deletePushStatus, boolean isForcedDelete)
Stop monitoring a push.protected void
updateOfflinePush(java.lang.String topic)
protected OfflinePushStatus
updatePushStatus(OfflinePushStatus expectedCurrPushStatus, ExecutionStatus newExecutionStatus, java.util.Optional<java.lang.String> newExecutionStatusDetails)
Direct calls to updatePushStatus should be made carefully.
-
-
-
Field Detail
-
MAX_PUSH_TO_KEEP
public static final int MAX_PUSH_TO_KEEP
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
AbstractPushMonitor
public AbstractPushMonitor(java.lang.String clusterName, OfflinePushAccessor offlinePushAccessor, StoreCleaner storeCleaner, ReadWriteStoreRepository metadataRepository, RoutingDataRepository routingDataRepository, AggPushHealthStats aggPushHealthStats, RealTimeTopicSwitcher realTimeTopicSwitcher, ClusterLockManager clusterLockManager, java.lang.String aggregateRealTimeSourceKafkaUrl, java.util.List<java.lang.String> activeActiveRealTimeSourceKafkaURLs, HelixAdminClient helixAdminClient, VeniceControllerClusterConfig controllerConfig, PushStatusStoreReader pushStatusStoreReader, DisabledPartitionStats disabledPartitionStats)
-
-
Method Detail
-
loadAllPushes
public void loadAllPushes()
Description copied from interface:PushMonitor
load previous push statuses when push monitor restarts- Specified by:
loadAllPushes
in interfacePushMonitor
-
startMonitorOfflinePush
public void startMonitorOfflinePush(java.lang.String kafkaTopic, int numberOfPartition, int replicaFactor, OfflinePushStrategy strategy)
- Specified by:
startMonitorOfflinePush
in interfacePushMonitor
strategy
- on which criteria a push is considered to be completed. checkPushStatusDecider
for more details Start monitoring a new push
-
stopMonitorOfflinePush
public void stopMonitorOfflinePush(java.lang.String kafkaTopic, boolean deletePushStatus, boolean isForcedDelete)
Description copied from interface:PushMonitor
Stop monitoring a push. This function should be called when: 1. Retire a version; 2. Leader controller transits to standby; in this case, controller shouldn't delete any push status.- Specified by:
stopMonitorOfflinePush
in interfacePushMonitor
-
stopAllMonitoring
public void stopAllMonitoring()
Description copied from interface:PushMonitor
Stop monitoring all offline pushes.- Specified by:
stopAllMonitoring
in interfacePushMonitor
-
cleanupStoreStatus
public void cleanupStoreStatus(java.lang.String storeName)
Description copied from interface:PushMonitor
Clean up all push statuses related to a store including all error pushes. This is called when a store gets deleted.- Specified by:
cleanupStoreStatus
in interfacePushMonitor
-
getOfflinePushOrThrow
public OfflinePushStatus getOfflinePushOrThrow(java.lang.String topic)
Description copied from interface:PushMonitor
Return a push's status (push status contains the history statuses and current status) and throw exception if it doesn't exist- Specified by:
getOfflinePushOrThrow
in interfacePushMonitor
-
getOfflinePush
protected OfflinePushStatus getOfflinePush(java.lang.String topic)
-
updateOfflinePush
protected void updateOfflinePush(java.lang.String topic)
-
getPushStatus
public ExecutionStatus getPushStatus(java.lang.String topic)
-
getIncrementalPushStatusAndDetails
public ExecutionStatusWithDetails getIncrementalPushStatusAndDetails(java.lang.String kafkaTopic, java.lang.String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo)
Description copied from interface:PushMonitor
Returns incremental push's status read from (ZooKeeper backed) OfflinePushStatus- Specified by:
getIncrementalPushStatusAndDetails
in interfacePushMonitor
-
getIncrementalPushStatusFromPushStatusStore
public ExecutionStatusWithDetails getIncrementalPushStatusFromPushStatusStore(java.lang.String kafkaTopic, java.lang.String incrementalPushVersion, HelixCustomizedViewOfflinePushRepository customizedViewRepo, PushStatusStoreReader pushStatusStoreReader)
Description copied from interface:PushMonitor
Returns incremental push's status read from push status store- Specified by:
getIncrementalPushStatusFromPushStatusStore
in interfacePushMonitor
-
getOngoingIncrementalPushVersions
public java.util.Set<java.lang.String> getOngoingIncrementalPushVersions(java.lang.String kafkaTopic)
Description copied from interface:PushMonitor
Check if there are any ongoing incremental push for the given version topic.- Specified by:
getOngoingIncrementalPushVersions
in interfacePushMonitor
- Parameters:
kafkaTopic
- to check for ongoing incremental push- Returns:
- ongoing incremental push versions if there are any, otherwise an empty set is returned.
-
getOngoingIncrementalPushVersions
public java.util.Set<java.lang.String> getOngoingIncrementalPushVersions(java.lang.String kafkaTopic, PushStatusStoreReader pushStatusStoreReader)
Get ongoing incremental push versions from the push status store- Specified by:
getOngoingIncrementalPushVersions
in interfacePushMonitor
- Parameters:
kafkaTopic
- kafka topic belonging to a store version for which we are fetching ongoing inc-pushe versionspushStatusStoreReader
- - push status system store reader- Returns:
- set of (supposedly) ongoing incremental pushes
-
getPushStatusAndDetails
public ExecutionStatusWithDetails getPushStatusAndDetails(java.lang.String topic)
- Specified by:
getPushStatusAndDetails
in interfacePushMonitor
- Returns:
- return the current status. If it's in error, some debugging info is also presented.
-
getTopicsOfOngoingOfflinePushes
public java.util.List<java.lang.String> getTopicsOfOngoingOfflinePushes()
Description copied from interface:PushMonitor
Find all ongoing pushes then return the topics associated to those pushes.- Specified by:
getTopicsOfOngoingOfflinePushes
in interfacePushMonitor
-
getOfflinePushStatusForStore
public java.util.List<OfflinePushStatus> getOfflinePushStatusForStore(java.lang.String storeName)
- Specified by:
getOfflinePushStatusForStore
in interfacePushMonitor
-
getUncompletedPartitions
public java.util.List<UncompletedPartition> getUncompletedPartitions(java.lang.String topic)
- Specified by:
getUncompletedPartitions
in interfacePushMonitor
-
markOfflinePushAsError
public void markOfflinePushAsError(java.lang.String topic, java.lang.String statusDetails)
Description copied from interface:PushMonitor
Mark a push to be as error. This is usually called when push is killed.- Specified by:
markOfflinePushAsError
in interfacePushMonitor
-
retireOldErrorPushes
protected void retireOldErrorPushes(java.lang.String storeName)
-
checkPushStatus
protected abstract ExecutionStatusWithDetails checkPushStatus(OfflinePushStatus pushStatus, PartitionAssignment partitionAssignment, DisableReplicaCallback callback)
-
getReadyToServeInstances
public abstract java.util.List<Instance> getReadyToServeInstances(PartitionAssignment partitionAssignment, int partitionId)
- Specified by:
getReadyToServeInstances
in interfacePushMonitor
-
refreshAndUpdatePushStatus
public OfflinePushStatus refreshAndUpdatePushStatus(java.lang.String kafkaTopic, ExecutionStatus newStatus, java.util.Optional<java.lang.String> newStatusDetails)
Description copied from interface:PushMonitor
Here, we refresh the push status, in order to avoid a race condition where a small job could already be completed. Previously, we would clobber the COMPLETED status with STARTED, which would stall the job forever. Now, since we get the refreshed status, we can validate whether a transition to {@param newStatus} is valid, before making the change. If if wouldn't be valid (because the job already completed or already failed, for example), then we leave the status as is, rather than adding in the new details.- Specified by:
refreshAndUpdatePushStatus
in interfacePushMonitor
- Returns:
- the new
OfflinePushStatus
if it was updated, or null if the update was skipped.
-
updatePushStatus
protected OfflinePushStatus updatePushStatus(OfflinePushStatus expectedCurrPushStatus, ExecutionStatus newExecutionStatus, java.util.Optional<java.lang.String> newExecutionStatusDetails)
Direct calls to updatePushStatus should be made carefully. e.g. calling withExecutionStatus
.ERROR or other terminal status update should be made through handleOfflinePushUpdate. That method will then invoke handleErrorPush and perform relevant operations to handle the ERROR status update properly.- Returns:
- the new
OfflinePushStatus
if it was updated, or null if the update was skipped.
-
getDurationInSec
protected long getDurationInSec(OfflinePushStatus pushStatus)
-
getOfflinePushAccessor
protected OfflinePushAccessor getOfflinePushAccessor()
-
getReadWriteStoreRepository
protected ReadWriteStoreRepository getReadWriteStoreRepository()
-
getRoutingDataRepository
protected RoutingDataRepository getRoutingDataRepository()
-
onPartitionStatusChange
public void onPartitionStatusChange(java.lang.String topic, ReadOnlyPartitionStatus partitionStatus)
- Specified by:
onPartitionStatusChange
in interfacePartitionStatusListener
- Specified by:
onPartitionStatusChange
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
onPartitionStatusChange
protected void onPartitionStatusChange(OfflinePushStatus offlinePushStatus)
-
getDisableReplicaCallback
protected DisableReplicaCallback getDisableReplicaCallback(java.lang.String kafkaTopic)
-
onExternalViewChange
public void onExternalViewChange(PartitionAssignment partitionAssignment)
Description copied from interface:RoutingDataRepository.RoutingDataChangedListener
Handle routing data changed event.- Specified by:
onExternalViewChange
in interfaceRoutingDataRepository.RoutingDataChangedListener
- Parameters:
partitionAssignment
- Newest partitions assignments information including resource name and all of instances assigned to this resource. If the number of partition is 0, it means the kafka topic is deleted.
-
onCustomizedViewChange
public void onCustomizedViewChange(PartitionAssignment partitionAssignment)
- Specified by:
onCustomizedViewChange
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
onCustomizedViewAdded
public void onCustomizedViewAdded(PartitionAssignment partitionAssignment)
- Specified by:
onCustomizedViewAdded
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
onRoutingDataDeleted
public void onRoutingDataDeleted(java.lang.String kafkaTopic)
- Specified by:
onRoutingDataDeleted
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
checkWhetherToStartBufferReplayForHybrid
protected void checkWhetherToStartBufferReplayForHybrid(OfflinePushStatus offlinePushStatus)
-
handleTerminalOfflinePushUpdate
protected void handleTerminalOfflinePushUpdate(OfflinePushStatus pushStatus, ExecutionStatusWithDetails statusWithDetails)
This method will unsubscribe external view changes and is intended to be called when the statues are terminable.
-
handleCompletedPush
protected void handleCompletedPush(java.lang.String topic)
-
handleErrorPush
protected void handleErrorPush(java.lang.String topic, ExecutionStatusWithDetails executionStatusWithDetails)
-
recordPushPreparationDuration
public void recordPushPreparationDuration(java.lang.String topic, long offlinePushWaitTimeInSecond)
Description copied from interface:PushMonitor
stats related operation. TODO: we may want to move it out of the interface- Specified by:
recordPushPreparationDuration
in interfacePushMonitor
-
setRealTimeTopicSwitcher
public void setRealTimeTopicSwitcher(RealTimeTopicSwitcher realTimeTopicSwitcher)
For testing only; in order to override the topicReplicator with mocked Replicator.
-
getRealTimeTopicSwitcher
public RealTimeTopicSwitcher getRealTimeTopicSwitcher()
-
isOfflinePushMonitorDaVinciPushStatusEnabled
public boolean isOfflinePushMonitorDaVinciPushStatusEnabled()
- Specified by:
isOfflinePushMonitorDaVinciPushStatusEnabled
in interfacePushMonitor
-
-