Package com.linkedin.venice.controller
Interface Admin
- All Superinterfaces:
AutoCloseable
,Closeable
- All Known Implementing Classes:
VeniceHelixAdmin
,VeniceParentHelixAdmin
-
Nested Class Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
abortMigration
(String srcClusterName, String destClusterName, String storeName) addDerivedSchema
(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId, String derivedSchemaStr) This method skips most precondition checks and is intended for only internal use.addDerivedSchema
(String clusterName, String storeName, int valueSchemaId, String derivedSchemaStr) void
addInstanceToAllowlist
(String clusterName, String helixNodeId) addReplicationMetadataSchema
(String clusterName, String storeName, int valueSchemaId, int replicationMetadataVersionId, String replicationMetadataSchemaStr) addSupersetSchema
(String clusterName, String storeName, String valueSchemaStr, int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) default SchemaEntry
addValueSchema
(String clusterName, String storeName, String valueSchemaStr, int schemaId) This method skips most precondition checks and is intended for only internal use.addValueSchema
(String clusterName, String storeName, String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType expectedCompatibilityType) addValueSchema
(String clusterName, String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) void
addVersionAndStartIngestion
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion) This method behaves differently inVeniceHelixAdmin
andVeniceParentHelixAdmin
.int
calculateNumberOfPartitions
(String clusterName, String storeName) Calculate how many partitions are needed for the given store.void
checkResourceCleanupBeforeStoreCreation
(String clusterName, String storeName) Check whether there are any resource left for the store creation in cluster: If there is any, this function should throw Exception.cleanupInstanceCustomizedStates
(String clusterName) Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant.default void
clearInstanceMonitor
(String clusterName) void
close()
compareStore
(String clusterName, String storeName, String fabricA, String fabricB) Compare store metadata and version states between two fabrics.void
completeMigration
(String srcClusterName, String destClusterName, String storeName) void
configureActiveActiveReplication
(String cluster, VeniceUserStoreType storeType, Optional<String> storeName, boolean enableActiveActiveReplicationForCluster, Optional<String> regionsFilter) Enable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster.copyOverStoreSchemasAndConfigs
(String clusterName, String srcFabric, String destFabric, String storeName) void
createStoragePersona
(String clusterName, String name, long quotaNumber, Set<String> storesToEnforce, Set<String> owners) default void
createStore
(String clusterName, String storeName, String owner, String keySchema, String valueSchema) default void
createStore
(String clusterName, String storeName, String owner, String keySchema, String valueSchema, boolean isSystemStore) void
createStore
(String clusterName, String storeName, String owner, String keySchema, String valueSchema, boolean isSystemStore, Optional<String> accessPermissions) void
deleteAclForStore
(String clusterName, String storeName) Delete the current set of ACL provisioned for a venice store and its associated kafka topic.deleteAllVersionsInStore
(String clusterName, String storeName) Delete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).void
deleteOldVersionInStore
(String clusterName, String storeName, int versionNum) Delete the given version from the store.void
deleteStoragePersona
(String clusterName, String name) void
deleteStore
(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) Delete the entire store including both metadata and real user's data.void
deleteValueSchemas
(String clusterName, String storeName, Set<Integer> inuseValueSchemaIds) Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIdsdiscoverCluster
(String storeName) Find the cluster which the given store belongs to.findAllBootstrappingVersions
(String clusterName) Find the store versions which have at least one bootstrap replica.getAclForStore
(String clusterName, String storeName) Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.getAdminCommandExecutionTracker
(String clusterName) Get the tracker used to track the execution of the admin command for the given cluster.getAdminTopicMetadata
(String clusterName, Optional<String> storeName) getAggregatedHealthStatus
(String cluster, List<String> instances, List<String> toBeStoppedInstances, boolean isSSLEnabled) getAggregateRealTimeTopicSource
(String clusterName) Return the source Kafka boostrap server url for aggregate real-time topic updatesgetAllowlist
(String clusterName) getAllStores
(String clusterName) getAllStoreStatuses
(String clusterName) Get the statuses of all stores.int
getBackupVersion
(String clusterName, String storeName) long
Returns default backup version retention time.getBackupVersionsForMultiColos
(String clusterName, String storeName) getBatchJobHeartbeatValue
(BatchJobHeartbeatKey batchJobHeartbeatKey) getChildControllerD2ServiceName
(String clusterName) Get child datacenter controller d2 service namegetChildDataCenterControllerD2Map
(String clusterName) Get child datacenter to child controller d2 zk host mappinggetChildDataCenterControllerUrlMap
(String clusterName) Get child datacenter to child controller url mapping.Get a list of clusters this controller is a leader of.getClusterStaleStores
(String clusterName) getClusterStoragePersonas
(String clusterName) getClusterStores
(String clusterName) Return all stores in a cluster.int
getCurrentVersion
(String clusterName, String storeName) getCurrentVersionsForMultiColos
(String clusterName, String storeName) default int
getDatacenterCount
(String clusterName) number of datacenters, 1 if in single cluster mode.int
getDerivedSchemaId
(String clusterName, String storeName, String schemaStr) getDerivedSchemas
(String clusterName, String storeName) getEmergencySourceRegion
(String clusterName) Return the emergency source region configuration.int
getFutureVersion
(String clusterName, String storeName) getFutureVersionsForMultiColos
(String clusterName, String storeName) long
getHeartbeatFromSystemStore
(String clusterName, String storeName) Read the latest heartbeat timestamp from system store.getHelixVeniceClusterResources
(String cluster) getIncrementalPushVersion
(String clusterName, String storeName, String pushJobId) getInUseValueSchemaIds
(String clusterName, String storeName) getKafkaBootstrapServers
(boolean isSSL) Return the ssl or non-ssl bootstrap servers based on the given flag.getKeySchema
(String clusterName, String storeName) int
getLargestUsedVersionFromStoreGraveyard
(String clusterName, String storeName) getLastSucceedExecutionId
(String clusterName) Get the id of the last succeed execution in this controller.getLeaderController
(String clusterName) Get instance of leader controller.getMetaStoreValue
(StoreMetaKey storeMetaKey, String storeName) Return aMetaStoreWriter
, which can be shared across different Venice clusters.int
getNativeReplicationKafkaBootstrapServerAddress
(String sourceFabric) getNativeReplicationSourceFabric
(String clusterName, Store store, Optional<String> sourceGridFabric, Optional<String> emergencySourceRegion, String targetedRegions) getOffLinePushStatus
(String clusterName, String kafkaTopic) Query the status of the offline push by given kafka topic.getOffLinePushStatus
(String clusterName, String kafkaTopic, Optional<String> incrementalPushVersion, String region, String targetedRegions) Return the state of the region of the parent controller.getPersonaAssociatedWithStore
(String clusterName, String storeName) getPubSubSSLProperties
(String pubSubBrokerAddress) Return a shared read only schema repository for zk shared stores.Return a shared read only store repository for zk shared stores.getReferenceVersionForStreamingWrites
(String clusterName, String storeName, String pushJobId) Return the region name of this AdmingetRegionPushDetails
(String clusterName, String storeName, boolean isPartitionDetailEnabled) getReplicas
(String clusterName, String kafkaTopic) getReplicasOfStorageNode
(String clusterName, String instanceId) int
getReplicationFactor
(String clusterName, String storeName) Optional<org.apache.avro.Schema>
getReplicationMetadataSchema
(String clusterName, String storeName, int valueSchemaID, int rmdVersionID) getReplicationMetadataSchemas
(String clusterName, String storeName) getRepushInfo
(String clusterNae, String storeName, Optional<String> fabricName) getRoutersClusterConfig
(String clusterName) Get the cluster level config for all routers.getServerD2Service
(String clusterName) Find the server d2 service associated with a given cluster name.double
getStorageEngineOverheadRatio
(String clusterName) getStorageNodes
(String clusterName) getStorageNodesStatus
(String clusterName, boolean enableReplica) getStorageNodesStatus
(String clusterName, String instanceId) Query and return the current status of the given storage node.getStoragePersona
(String clusterName, String name) Return a shared store config repository.getTopicManager
(String pubSubServerAddress) getValueSchema
(String clusterName, String storeName, int id) int
getValueSchemaId
(String clusterName, String storeName, String valueSchemaStr) getValueSchemas
(String clusterName, String storeName) boolean
default boolean
hasWritePermissionToBatchJobHeartbeatStore
(X509Certificate requesterCert, String batchJobHeartbeatStoreName) default Version
incrementVersionIdempotent
(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor) The implementation of this method must take no action and return the same Version object if the same parameters are provided on a subsequent invocation.default Version
incrementVersionIdempotent
(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, int repushSourceVersion) incrementVersionIdempotent
(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) void
initiateDataRecovery
(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, boolean copyAllVersionConfigs, Optional<Version> sourceFabricVersion) Initiate data recovery for a store version given a source fabric.void
initStorageCluster
(String clusterName) boolean
isActiveActiveReplicationEnabledInAllRegion
(String clusterName, String storeName, boolean checkCurrentVersion) Returns true if A/A replication is enabled in all child controller and parent controller.default boolean
isAdminTopicConsumptionEnabled
(String clusterName) Return whether the admin consumption task is enabled for the passed cluster.boolean
isClusterValid
(String clusterName) isInstanceRemovable
(String clusterName, String helixNodeId, List<String> lockedNodes) Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster.boolean
isLeaderControllerFor
(String clusterName) Check if this controller itself is the leader controller for a given cluster or not.boolean
This function can be used to perform cluster-wide operations which need to be performed by a single process only in the whole cluster.boolean
isParent()
Check whether the controller works as a parent controllerboolean
isResourceStillAlive
(String resourceName) Check whether the specified resource is fully removed or not.boolean
isRTTopicDeletionPermittedByAllControllers
(String clusterName, String storeName) boolean
isSSLEnabledForPush
(String clusterName, String storeName) Return whether ssl is enabled for the given store for push.boolean
boolean
isStorageNodeNewerOrEqualTo
(String clusterName, String instanceId, StorageNodeStatus oldServerStatus) Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one.boolean
isStoreMigrationAllowed
(String srcClusterName) isStoreVersionReadyForDataRecovery
(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Check if the store version's previous states and resources are cleaned up and ready to start data recovery.boolean
isTopicTruncated
(String topicName) boolean
isTopicTruncatedBasedOnRetention
(long retention) boolean
isTopicTruncatedBasedOnRetention
(String topicName, long retention) void
killOfflinePush
(String clusterName, String kafkaTopic, boolean isForcedKill) Kill an offline push if it ran into errors or the corresponding version is being retired.listStorePushInfo
(String clusterName, String storeName, boolean isPartitionDetailEnabled) void
migrateStore
(String srcClusterName, String destClusterName, String storeName) nodeReplicaReadiness
(String cluster, String helixNodeId) helixNodeId nodeId of helix participant.peekNextVersion
(String clusterName, String storeName) void
prepareDataRecovery
(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Prepare for data recovery in the destination fabric.removeDerivedSchema
(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId) Remove an existing derived schemavoid
removeInstanceFromAllowList
(String clusterName, String helixNodeId) void
removeStorageNode
(String clusterName, String instanceId) void
removeStoreFromGraveyard
(String clusterName, String storeName) void
rollbackToBackupVersion
(String clusterName, String storeName, String regionFilter) void
rollForwardToFutureVersion
(String clusterName, String storeName, String regionFilter) void
sendHeartbeatToSystemStore
(String clusterName, String storeName, long heartbeatTimestamp) Send a heartbeat timestamp to targeted system store.void
sendPushJobDetails
(PushJobStatusRecordKey key, PushJobDetails value) void
setAdminConsumerService
(String clusterName, AdminConsumerService service) void
setStoreCurrentVersion
(String clusterName, String storeName, int versionNumber) void
setStoreLargestUsedVersion
(String clusterName, String storeName, int versionNumber) void
setStoreOwner
(String clusterName, String storeName, String owner) void
setStorePartitionCount
(String clusterName, String storeName, int partitionCount) void
setStorePushStrategyForMigration
(String voldemortStoreName, String strategy) void
setStoreReadability
(String clusterName, String storeName, boolean desiredReadability) void
setStoreReadWriteability
(String clusterName, String storeName, boolean isAccessible) void
setStoreWriteability
(String clusterName, String storeName, boolean desiredWriteability) void
skipAdminMessage
(String clusterName, long offset, boolean skipDIV) The admin consumption task tries to deal with failures to process an admin message by retrying.default void
startInstanceMonitor
(String clusterName) void
Stop the helix controller for a single cluster.void
Stop the entire controller but not only the helix controller for a single cluster.boolean
truncateKafkaTopic
(String topicName) boolean
truncateKafkaTopic
(String topicName, long retentionTimeInMs) Truncate a Kafka topic by setting its retention time to the input value.void
updateAclForStore
(String clusterName, String storeName, String accessPermisions) Provision a new set of ACL for a venice store and its associated kafka topic.void
updateAdminTopicMetadata
(String clusterName, long executionId, Optional<String> storeName, Optional<Long> offset, Optional<Long> upstreamOffset) void
updateClusterConfig
(String clusterName, UpdateClusterConfigQueryParams params) void
updateClusterDiscovery
(String storeName, String oldCluster, String newCluster, String initiatingCluster) Update the cluster discovery of a given store by writing to the StoreConfig ZNode.void
updateRoutersClusterConfig
(String clusterName, Optional<Boolean> isThrottlingEnable, Optional<Boolean> isQuotaRebalancedEnable, Optional<Boolean> isMaxCapaictyProtectionEnabled, Optional<Integer> expectedRouterCount) Update the cluster level for all routers.void
updateStoragePersona
(String clusterName, String name, UpdateStoragePersonaQueryParams queryParams) void
updateStore
(String clusterName, String storeName, UpdateStoreQueryParams params) void
validateAndMaybeRetrySystemStoreAutoCreation
(String clusterName, String storeName, VeniceSystemStoreType systemStoreType) versionsForStore
(String clusterName, String storeName) boolean
whetherEnableBatchPushFromAdmin
(String storeName) void
wipeCluster
(String clusterName, String fabric, Optional<String> storeName, Optional<Integer> versionNum) void
writeEndOfPush
(String clusterName, String storeName, int versionNumber, boolean alsoWriteStartOfPush)
-
Method Details
-
initStorageCluster
-
isClusterValid
-
createStore
-
createStore
-
createStore
-
isStoreMigrationAllowed
-
migrateStore
-
completeMigration
-
abortMigration
-
deleteStore
void deleteStore(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) Delete the entire store including both metadata and real user's data. Before deleting a store, we should disable the store manually to ensure there is no reading/writing request hitting this tore. -
addVersionAndStartIngestion
void addVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion) This method behaves differently inVeniceHelixAdmin
andVeniceParentHelixAdmin
. -
hasWritePermissionToBatchJobHeartbeatStore
default boolean hasWritePermissionToBatchJobHeartbeatStore(X509Certificate requesterCert, String batchJobHeartbeatStoreName) throws AclException - Throws:
AclException
-
incrementVersionIdempotent
default Version incrementVersionIdempotent(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor) The implementation of this method must take no action and return the same Version object if the same parameters are provided on a subsequent invocation. The expected use is multiple distributed components of a single push (with a single jobPushId) that each need to query Venice for the Version (and Kafka topic) to write into. The first task triggers a new Version, all subsequent tasks identify with the same jobPushId, and should be provided with the same Version object. -
incrementVersionIdempotent
default Version incrementVersionIdempotent(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, int repushSourceVersion) -
incrementVersionIdempotent
Version incrementVersionIdempotent(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) -
getIncrementalPushVersion
-
getReferenceVersionForStreamingWrites
-
getCurrentVersion
-
getCurrentVersionsForMultiColos
-
getFutureVersionsForMultiColos
-
getBackupVersionsForMultiColos
-
getBackupVersion
-
getFutureVersion
-
getRepushInfo
-
peekNextVersion
-
deleteAllVersionsInStore
Delete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).- Throws:
VeniceException
- If the given store was not disabled, an exception would be thrown to reject deletion request.
-
deleteOldVersionInStore
Delete the given version from the store. If the given version is the current version, an exception will be thrown. -
versionsForStore
-
getAllStores
-
getAllStoreStatuses
Get the statuses of all stores. The store status is decided by the current version. For example, if one partition only have 2 ONLINE replicas in the current version, we say this store is under replicated. Refer toStoreStatus
for the definition of each status.- Returns:
- a map whose key is store name and value is store's status.
-
getStore
-
hasStore
-
getKeySchema
-
getValueSchemas
-
getDerivedSchemas
-
getValueSchemaId
-
getDerivedSchemaId
-
getValueSchema
-
addValueSchema
SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) -
addValueSchema
SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType expectedCompatibilityType) -
addValueSchema
default SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, int schemaId) This method skips most precondition checks and is intended for only internal use. Code from outside should calladdValueSchema(String, String, String, DirectionalSchemaCompatibilityType)
instead. -
addSupersetSchema
SchemaEntry addSupersetSchema(String clusterName, String storeName, String valueSchemaStr, int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) -
addDerivedSchema
DerivedSchemaEntry addDerivedSchema(String clusterName, String storeName, int valueSchemaId, String derivedSchemaStr) -
getInUseValueSchemaIds
-
deleteValueSchemas
Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIds -
getMetaStoreValue
-
addDerivedSchema
DerivedSchemaEntry addDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId, String derivedSchemaStr) This method skips most precondition checks and is intended for only internal use. -
getReplicationMetadataSchemas
-
getReplicationMetadataSchema
-
addReplicationMetadataSchema
RmdSchemaEntry addReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaId, int replicationMetadataVersionId, String replicationMetadataSchemaStr) -
validateAndMaybeRetrySystemStoreAutoCreation
void validateAndMaybeRetrySystemStoreAutoCreation(String clusterName, String storeName, VeniceSystemStoreType systemStoreType) -
removeDerivedSchema
DerivedSchemaEntry removeDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId) Remove an existing derived schema- Returns:
- the derived schema that is deleted or null if the schema doesn't exist
-
setStoreCurrentVersion
-
rollForwardToFutureVersion
-
rollbackToBackupVersion
-
setStoreLargestUsedVersion
-
setStoreOwner
-
setStorePartitionCount
-
setStoreReadability
-
setStoreWriteability
-
setStoreReadWriteability
-
updateStore
-
updateClusterConfig
-
getStorageEngineOverheadRatio
-
getStorageNodes
-
getStorageNodesStatus
-
removeStorageNode
-
stop
Stop the helix controller for a single cluster. -
stopVeniceController
void stopVeniceController()Stop the entire controller but not only the helix controller for a single cluster. -
getOffLinePushStatus
Query the status of the offline push by given kafka topic. TODO We use kafka topic to tracking the status now but in the further we should use jobId instead of kafka TODO topic. Right now each kafka topic only have one offline job. But in the further one kafka topic could be TODO assigned multiple jobs like data migration job etc.- Returns:
- the status of current offline push for the passed kafka topic
-
getOffLinePushStatus
-
getKafkaBootstrapServers
Return the ssl or non-ssl bootstrap servers based on the given flag.- Returns:
- kafka bootstrap servers url, if there are multiple will be comma separated.
-
getRegionName
String getRegionName()Return the region name of this Admin- Returns:
- the region name of this controller
-
getNativeReplicationKafkaBootstrapServerAddress
-
getNativeReplicationSourceFabric
-
isSSLEnabledForPush
Return whether ssl is enabled for the given store for push. -
isSslToKafka
boolean isSslToKafka() -
getTopicManager
TopicManager getTopicManager() -
getTopicManager
-
getAggregatedHealthStatus
-
isRTTopicDeletionPermittedByAllControllers
-
isLeaderControllerFor
Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be either a parent controller or a child controller since a cluster must have a leader child controller and a leader parent controller. The point is not to be confused the concept of leader-standby with parent-child controller architecture. -
calculateNumberOfPartitions
Calculate how many partitions are needed for the given store. -
getReplicationFactor
-
getDatacenterCount
number of datacenters, 1 if in single cluster mode. Could be more if this is a parent controller -
getReplicas
-
getReplicasOfStorageNode
-
isInstanceRemovable
NodeRemovableResult isInstanceRemovable(String clusterName, String helixNodeId, List<String> lockedNodes) Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster. For example, if there is only one online replica alive in this cluster which is hosted on the given instance. This instance should not be removed out of cluster, otherwise Venice will lose data. For detail criteria please refer toInstanceStatusDecider
- Parameters:
clusterName
- The cluster were the hosts belong.helixNodeId
- nodeId of helix participant. HOST_PORT.lockedNodes
- A list of helix nodeIds whose resources are assumed to be unusable (stopped).
-
getLeaderController
Get instance of leader controller. If there is no leader controller for the given cluster, throw a VeniceException. -
addInstanceToAllowlist
-
removeInstanceFromAllowList
-
getAllowlist
-
killOfflinePush
Kill an offline push if it ran into errors or the corresponding version is being retired.- Parameters:
clusterName
-kafkaTopic
-isForcedKill
- should be set to true when killing the push job for retiring the corresponding version.
-
getStorageNodesStatus
Query and return the current status of the given storage node. The "storage node status" is composed by "status" of all replicas in that storage node. "status" is an integer value of Helix state:- DROPPED=1
- ERROR=2
- OFFLINE=3
- BOOTSTRAP=4
- ONLINE=5
-
isStorageNodeNewerOrEqualTo
boolean isStorageNodeNewerOrEqualTo(String clusterName, String instanceId, StorageNodeStatus oldServerStatus) Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one. Compare will go through each of replica in this storage node, if all their statuses values were larger or equal than the statuses value in the given storage node status, We say current storage node status is "Newer" or "Equal " to the given one. -
setAdminConsumerService
-
skipAdminMessage
The admin consumption task tries to deal with failures to process an admin message by retrying. If there is a message that cannot be processed for some reason, we will need to forcibly skip that message in order to unblock the task from consuming subsequent messages.- Parameters:
clusterName
-offset
-skipDIV
- tries to skip only the DIV check for the blocking message.
-
getLastSucceedExecutionId
Get the id of the last succeed execution in this controller. -
getAdminCommandExecutionTracker
Get the tracker used to track the execution of the admin command for the given cluster. -
getRoutersClusterConfig
Get the cluster level config for all routers. -
updateRoutersClusterConfig
void updateRoutersClusterConfig(String clusterName, Optional<Boolean> isThrottlingEnable, Optional<Boolean> isQuotaRebalancedEnable, Optional<Boolean> isMaxCapaictyProtectionEnabled, Optional<Integer> expectedRouterCount) Update the cluster level for all routers. -
getAllStorePushStrategyForMigration
-
setStorePushStrategyForMigration
-
discoverCluster
Find the cluster which the given store belongs to. Return the pair of the cluster name and the d2 service associated with that cluster.- Throws:
VeniceException
- if not cluster is found.
-
getServerD2Service
Find the server d2 service associated with a given cluster name. -
findAllBootstrappingVersions
Find the store versions which have at least one bootstrap replica. -
getVeniceWriterFactory
VeniceWriterFactory getVeniceWriterFactory() -
getPubSubConsumerAdapterFactory
PubSubConsumerAdapterFactory getPubSubConsumerAdapterFactory() -
getPubSubSSLProperties
-
close
void close()- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-
isLeaderControllerOfControllerCluster
boolean isLeaderControllerOfControllerCluster()This function can be used to perform cluster-wide operations which need to be performed by a single process only in the whole cluster. There could be a race condition during leader controller failover, and so long operation should have some way of guarding against that. -
isTopicTruncated
-
isTopicTruncatedBasedOnRetention
boolean isTopicTruncatedBasedOnRetention(long retention) -
isTopicTruncatedBasedOnRetention
-
getMinNumberOfUnusedKafkaTopicsToPreserve
int getMinNumberOfUnusedKafkaTopicsToPreserve() -
truncateKafkaTopic
- Returns:
- false indicates that the truncate operation has already been done before; true if it's the first time truncating this topic.
-
truncateKafkaTopic
Truncate a Kafka topic by setting its retention time to the input value.- Parameters:
topicName
- the name of the topic to truncate.retentionTimeInMs
- the retention time in milliseconds to set for the topic.- Returns:
- true if truncating this topic successfully. false otherwise.
-
isResourceStillAlive
Check whether the specified resource is fully removed or not. -
updateClusterDiscovery
void updateClusterDiscovery(String storeName, String oldCluster, String newCluster, String initiatingCluster) Update the cluster discovery of a given store by writing to the StoreConfig ZNode.- Parameters:
storeName
- of the store.oldCluster
- for the store.newCluster
- for the store.initiatingCluster
- that is making the update. This is needed because in the case of store migration sometimes the update is not made by the leader of the current cluster but instead the leader of the source cluster.
-
sendPushJobDetails
-
getPushJobDetails
-
getBatchJobHeartbeatValue
-
writeEndOfPush
-
whetherEnableBatchPushFromAdmin
-
updateAclForStore
Provision a new set of ACL for a venice store and its associated kafka topic. -
getAclForStore
Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.- Returns:
- The string representation of the accessPermissions. It will return empty string in case store is not present.
-
deleteAclForStore
Delete the current set of ACL provisioned for a venice store and its associated kafka topic. -
isParent
boolean isParent()Check whether the controller works as a parent controller- Returns:
- true if it works as a parent controller. Otherwise, return false.
-
getParentControllerRegionState
ParentControllerRegionState getParentControllerRegionState()Return the state of the region of the parent controller.- Returns:
ParentControllerRegionState.ACTIVE
which means that the parent controller in the region is serving requests. Otherwise, returnParentControllerRegionState.PASSIVE
-
getChildDataCenterControllerUrlMap
Get child datacenter to child controller url mapping.- Returns:
- A map of child datacenter -> child controller url
-
getChildDataCenterControllerD2Map
Get child datacenter to child controller d2 zk host mapping- Returns:
- A map of child datacenter -> child controller d2 zk host
-
getChildControllerD2ServiceName
Get child datacenter controller d2 service name- Returns:
- d2 service name
-
getStoreConfigRepo
HelixReadOnlyStoreConfigRepository getStoreConfigRepo()Return a shared store config repository. -
getMetaStoreWriter
MetaStoreWriter getMetaStoreWriter()Return aMetaStoreWriter
, which can be shared across different Venice clusters. -
getMetaStoreReader
MetaStoreReader getMetaStoreReader() -
getClustersLeaderOf
Get a list of clusters this controller is a leader of.- Returns:
- a list of clusters this controller is a leader of.
-
configureActiveActiveReplication
void configureActiveActiveReplication(String cluster, VeniceUserStoreType storeType, Optional<String> storeName, boolean enableActiveActiveReplicationForCluster, Optional<String> regionsFilter) Enable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster. If storeName is not empty, only the specified store might be updated. -
checkResourceCleanupBeforeStoreCreation
Check whether there are any resource left for the store creation in cluster: If there is any, this function should throw Exception. -
getEmergencySourceRegion
Return the emergency source region configuration. -
getAggregateRealTimeTopicSource
Return the source Kafka boostrap server url for aggregate real-time topic updates -
isActiveActiveReplicationEnabledInAllRegion
boolean isActiveActiveReplicationEnabledInAllRegion(String clusterName, String storeName, boolean checkCurrentVersion) Returns true if A/A replication is enabled in all child controller and parent controller. This is implemented only in parent controller. Otherwise, return false. -
getBackupVersionDefaultRetentionMs
long getBackupVersionDefaultRetentionMs()Returns default backup version retention time. -
getDefaultMaxRecordSizeBytes
int getDefaultMaxRecordSizeBytes()- Returns:
- The default value of
VeniceWriter.maxRecordSizeBytes
which is provided to the VPJ and Consumer as a controller config to dynamically control the setting per cluster.
-
wipeCluster
-
copyOverStoreSchemasAndConfigs
-
compareStore
StoreComparisonInfo compareStore(String clusterName, String storeName, String fabricA, String fabricB) throws IOException Compare store metadata and version states between two fabrics.- Throws:
IOException
-
nodeReplicaReadiness
Pair<NodeReplicasReadinessState,List<Replica>> nodeReplicaReadiness(String cluster, String helixNodeId) helixNodeId nodeId of helix participant. HOST_PORT. Returns ture, if all current version replicas of the input node are ready to serve. false and all unready replicas otherwise. -
initiateDataRecovery
void initiateDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, boolean copyAllVersionConfigs, Optional<Version> sourceFabricVersion) Initiate data recovery for a store version given a source fabric.- Parameters:
clusterName
- of the store.storeName
- of the store.version
- of the store.sourceFabric
- to be used as the source for data recovery.copyAllVersionConfigs
- a boolean to indicate whether all version configs should be copied from the source fabric or only the essential version configs and generate the rest based on destination fabric's Store configs.sourceFabricVersion
- source fabric's Version configs used to configure the recovering version in the destination fabric.
-
prepareDataRecovery
void prepareDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Prepare for data recovery in the destination fabric. The interested store version might have lingering states and resources in the destination fabric from previous failed attempts. Perform some basic checks to make sure the store version in the destination fabric is capable of performing data recovery and cleanup any lingering states and resources. -
isStoreVersionReadyForDataRecovery
Pair<Boolean,String> isStoreVersionReadyForDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Check if the store version's previous states and resources are cleaned up and ready to start data recovery.- Returns:
- whether is ready to start data recovery and the reason if it's not ready.
-
isAdminTopicConsumptionEnabled
Return whether the admin consumption task is enabled for the passed cluster. -
getClusterStores
Return all stores in a cluster. -
getClusterStaleStores
-
getLargestUsedVersionFromStoreGraveyard
- Returns:
- the largest used version number for the given store from store graveyard.
-
listStorePushInfo
Map<String,RegionPushDetails> listStorePushInfo(String clusterName, String storeName, boolean isPartitionDetailEnabled) -
getRegionPushDetails
RegionPushDetails getRegionPushDetails(String clusterName, String storeName, boolean isPartitionDetailEnabled) -
getAdminTopicMetadata
-
updateAdminTopicMetadata
-
createStoragePersona
-
getStoragePersona
-
deleteStoragePersona
-
updateStoragePersona
void updateStoragePersona(String clusterName, String name, UpdateStoragePersonaQueryParams queryParams) -
getPersonaAssociatedWithStore
-
getClusterStoragePersonas
-
cleanupInstanceCustomizedStates
Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant. This operation shouldn't be needed under normal circumstances. It's intended to cleanup ZNodes that failed to be deleted due to bugs and errors.- Parameters:
clusterName
- to perform the cleanup.- Returns:
- list of deleted ZNode paths.
-
getStoreGraveyard
StoreGraveyard getStoreGraveyard() -
removeStoreFromGraveyard
-
startInstanceMonitor
-
clearInstanceMonitor
-
getPushStatusStoreReader
PushStatusStoreReader getPushStatusStoreReader() -
getPushStatusStoreWriter
PushStatusStoreWriter getPushStatusStoreWriter() -
sendHeartbeatToSystemStore
Send a heartbeat timestamp to targeted system store. -
getHeartbeatFromSystemStore
Read the latest heartbeat timestamp from system store. If it failed to read from system store, this method should return -1. -
getHelixVeniceClusterResources
- Returns:
- the aggregate resources required by controller to manage a Venice cluster.
-
getPubSubTopicRepository
PubSubTopicRepository getPubSubTopicRepository()
-