Class VeniceHelixAdmin
- All Implemented Interfaces:
Admin
,StoreCleaner
,Closeable
,AutoCloseable
After using controller as service mode. There are two levels of cluster and controllers. Each venice controller will hold a level1 helix controller which will keep connecting to Helix, there is a cluster only used for all of these level1 controllers(controller's cluster). The second level is our venice clusters. Like prod cluster, dev cluster etc. Each of cluster will be Helix resource in the controller's cluster. Helix will choose one of level1 controller becoming the leader of our venice cluster. In our distributed controllers state transition handler, a level2 controller will be initialized to manage this venice cluster only. If this level1 controller is chosen as the leader controller of multiple Venice clusters, multiple level2 controller will be created based on cluster specific config.
Admin is shared by multiple cluster's controllers running in one physical Venice controller instance.
-
Nested Class Summary
Nested classes/interfaces inherited from interface com.linkedin.venice.controller.Admin
Admin.OfflinePushStatusInfo
-
Field Summary
Modifier and TypeFieldDescriptionprotected static final int
protected static final long
protected final PubSubTopicRepository
-
Constructor Summary
ConstructorDescriptionVeniceHelixAdmin
(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sslEnabled, com.linkedin.d2.balancer.D2Client d2Client, Optional<SSLConfig> sslConfig, Optional<DynamicAccessController> accessController, Optional<ICProvider> icProvider, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory, List<ClusterLeaderInitializationRoutine> additionalInitRoutines) VeniceHelixAdmin
(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, com.linkedin.d2.balancer.D2Client d2Client, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory) -
Method Summary
Modifier and TypeMethodDescriptionvoid
abortMigration
(String srcClusterName, String destClusterName, String storeName) Abort store migration by resetting migration flag at the source cluster, resetting storeConfig, and updating "cluster" in "/storeConfigs" znode back to the source cluster.addDerivedSchema
(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties.addDerivedSchema
(String clusterName, String storeName, int valueSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties and return a newDerivedSchemaEntry
object containing the schema and its id.void
addInstanceToAllowlist
(String clusterName, String helixNodeId) Add the given helix nodeId into the allowlist in ZK.addReplicationMetadataSchema
(String clusterName, String storeName, int valueSchemaId, int replicationMetadataVersionId, String replicationMetadataSchemaStr) Create a newReplicationMetadataSchemaEntry
object with the given properties and add it into schema repository if no duplication.boolean
addSpecificVersion
(String clusterName, String storeName, Version version) TODO refactor addVersion to these broken down methods instead of doing everything in one giant method.addSupersetSchema
(String clusterName, String storeName, String valueSchema, int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) Add a new superset schema for the given store with all specified properties.addValueSchema
(String clusterName, String storeName, String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType compatibilityType) Add a new value schema for the given store with all specified properties and return a new SchemaEntry object containing the schema and its id.addValueSchema
(String clusterName, String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) void
addVersionAndStartIngestion
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion) This method behaves differently inVeniceHelixAdmin
andVeniceParentHelixAdmin
.void
addVersionAndStartIngestion
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked from the admin channel.addVersionAndTopicOnly
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, String compressionDictionary, String remoteKafkaBootstrapServers, Optional<String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, Optional<String> emergencySourceRegion, boolean versionSwapDeferred) addVersionAndTopicOnly
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, String compressionDictionary, String remoteKafkaBootstrapServers, Optional<String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) A wrapper to invoke VeniceHelixAdmin#addVersion to only increment the version and create the topic(s) needed without starting ingestion.addVersionOnly
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId) Only add version to the store without creating the topic or start ingestion.int
calculateNumberOfPartitions
(String clusterName, String storeName) Calculate number of partition for given store.protected void
checkPreConditionForCreateStore
(String clusterName, String storeName, String keySchema, String valueSchema, boolean allowSystemStore, boolean skipLingeringResourceCheck) Check whether Controller should block the incoming store creation.void
checkResourceCleanupBeforeStoreCreation
(String clusterName, String storeName) Check whether there are any resource left for the store creation in cluster: If there is any, this function should throw Exception.cleanupInstanceCustomizedStates
(String clusterName) Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant.void
clearIngestionKillMessageAndVerify
(String clusterName, String versionTopicName) Clear KILL messages from a participant system store.void
clearInstanceMonitor
(String clusterName) void
close()
Cause VeniceHelixAdmin and its associated services to stop executing.compareStore
(String clusterName, String storeName, String fabricA, String fabricB) Compare store metadata and version states between two fabrics.void
completeMigration
(String srcClusterName, String destClusterName, String storeName) void
configureActiveActiveReplication
(String clusterName, VeniceUserStoreType storeType, Optional<String> storeName, boolean enableActiveActiveReplicationForCluster, Optional<String> regionsFilter) Enable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster.boolean
containsHelixResource
(String clusterName, String kafkaTopic) This purpose of the function is to check if the given resource exists in the Helix cluster.copyOverStoreSchemasAndConfigs
(String clusterName, String srcFabric, String destFabric, String storeName) void
createHelixResourceAndStartMonitoring
(String clusterName, String storeName, Version version) Create Helix-resources for a given storage node cluster and starts monitoring a new push.void
createSpecificVersionTopic
(String clusterName, String storeName, Version version) Create the corresponding version topic based on the providedVersion
void
createStoragePersona
(String clusterName, String name, long quotaNumber, Set<String> storesToEnforce, Set<String> owners) void
createStore
(String clusterName, String storeName, String owner, String keySchema, String valueSchema, boolean isSystemStore, Optional<String> accessPermissions) Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.void
deleteAclForStore
(String clusterName, String storeName) Delete the current set of ACL provisioned for a venice store and its associated kafka topic.deleteAllVersionsInStore
(String clusterName, String storeName) Delete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).void
deleteHelixResource
(String clusterName, String kafkaTopic) This purpose of the function is to delete the given resource from the Helix cluster.void
deleteOldVersionInStore
(String clusterName, String storeName, int versionNum) Delete the given version from the store.void
deleteOneStoreVersion
(String clusterName, String storeName, int versionNumber) Delete version from cluster, removing all related resourcesvoid
deleteParticipantStoreKillMessage
(String clusterName, String kafkaTopic) Compose aParticipantMessageKey
message and execute a delete operation on the key to the cluster's participant store.void
deleteStoragePersona
(String clusterName, String name) void
deleteStore
(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) This method will delete store data, metadata, version and rt topics One exception is for stores with isMigrating flag set.void
deleteValueSchemas
(String clusterName, String storeName, Set<Integer> unusedValueSchemaIds) Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIdsdiscoverCluster
(String storeName) Find the cluster which the given store belongs to.void
enableDisabledPartition
(String clusterName, String kafkaTopic, boolean enableAll) findAllBootstrappingVersions
(String clusterName) Find the store versions which have at least one bootstrap replica.getAclForStore
(String clusterName, String storeName) Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.getAdminCommandExecutionTracker
(String clusterName) Get the tracker used to track the execution of the admin command for the given cluster.getAdminTopicMetadata
(String clusterName, Optional<String> storeName) getAggregatedHealthStatus
(String cluster, List<String> instances, List<String> toBeStoppedInstances, boolean isSSLEnabled) getAggregateRealTimeTopicSource
(String clusterName) Return the source Kafka boostrap server url for aggregate real-time topic updatesgetAllowlist
(String clusterName) Unsupported operation in the child controller.getAllStores
(String clusterName) getAllStoreStatuses
(String clusterName) Get the statuses of all stores.int
getBackupVersion
(String clusterName, String storeName) long
Returns default backup version retention time.int
getBackupVersionNumber
(List<Version> versions, int currentVersion) Get backup version number, the largest online version number that is less than the current version numbergetBackupVersionsForMultiColos
(String clusterName, String storeName) getBatchJobHeartbeatValue
(BatchJobHeartbeatKey batchJobHeartbeatKey) getChildControllerD2ServiceName
(String clusterName) Get child datacenter controller d2 service namegetChildDataCenterControllerD2Map
(String clusterName) Get child datacenter to child controller d2 zk host mappinggetChildDataCenterControllerUrlMap
(String clusterName) Get child datacenter to child controller url mapping.Get a list of clusters this controller is a leader of.getClusterStaleStores
(String clusterName) getClusterStoragePersonas
(String clusterName) getClusterStores
(String clusterName) Return all stores in a cluster.getControllerClientMap
(String clusterName) int
getCurrentVersion
(String clusterName, String storeName) getCurrentVersionsForMultiColos
(String clusterName, String storeName) int
getDerivedSchemaId
(String clusterName, String storeName, String schemaStr) getDerivedSchemas
(String clusterName, String storeName) getDisabledPartitionStats
(String clusterName) getEmergencySourceRegion
(String clusterName) Return the emergency source region configuration.int
getFutureVersion
(String clusterName, String storeName) getFutureVersionsForMultiColos
(String clusterName, String storeName) long
getHeartbeatFromSystemStore
(String clusterName, String systemStoreName) Read the latest heartbeat timestamp from system store.protected org.apache.helix.HelixAdmin
getHelixVeniceClusterResources
(String cluster) getIncrementalPushVersion
(String clusterName, String storeName, String pushJobId) getInUseValueSchemaIds
(String clusterName, String storeName) getKafkaBootstrapServers
(boolean isSSL) Return the ssl or non-ssl bootstrap servers based on the given flag.getKeySchema
(String clusterName, String storeName) int
getLargestUsedVersionFromStoreGraveyard
(String clusterName, String storeName) getLastSucceededExecutionId
(String clusterName, String storeName) Get last succeeded execution id for a given store; if storeName is null, return the last succeeded execution id for the clustergetLastSucceedExecutionId
(String clusterName) Get the id of the last succeed execution in this controller.getLeaderController
(String clusterName) Get instance of leader controller.getLiveInstanceMonitor
(String clusterName) getMetaStoreValue
(StoreMetaKey metaKey, String storeName) Return aMetaStoreWriter
, which can be shared across different Venice clusters.int
getNativeReplicationKafkaBootstrapServerAddress
(String sourceFabric) getNativeReplicationSourceFabric
(String clusterName, Store store, Optional<String> sourceGridFabric, Optional<String> emergencySourceRegion, String targetedRegions) Source fabric selection priority: 1.getOffLinePushStatus
(String clusterName, String kafkaTopic) Query the status of the offline push by given kafka topic.getOffLinePushStatus
(String clusterName, String kafkaTopic, Optional<String> incrementalPushVersion, String region, String targetedRegions) int
getOnlineFutureVersion
(String clusterName, String storeName) protected static ExecutionStatus
getOverallPushStatus
(ExecutionStatus veniceStatus, ExecutionStatus daVinciStatus) Return the state of the region of the parent controller.getPersonaAssociatedWithStore
(String clusterName, String storeName) getPubSubSSLProperties
(String pubSubBrokerAddress) Return a shared read only schema repository for zk shared stores.Return a shared read only store repository for zk shared stores.getReferenceVersionForStreamingWrites
(String clusterName, String storeName, String pushJobId) Return the region name of this AdmingetRegionPushDetails
(String clusterName, String storeName, boolean isPartitionDetailAdded) getReplicas
(String clusterName, String kafkaTopic) getReplicasOfStorageNode
(String cluster, String instanceId) int
getReplicationFactor
(String clusterName, String storeName) Optional<org.apache.avro.Schema>
getReplicationMetadataSchema
(String clusterName, String storeName, int valueSchemaID, int rmdVersionID) getReplicationMetadataSchemas
(String clusterName, String storeName) getRepushInfo
(String clusterName, String storeName, Optional<String> fabricName) getRoutersClusterConfig
(String clusterName) Get the cluster level config for all routers.getServerD2Service
(String clusterName) Find the server d2 service associated with a given cluster name.getStartedVersion
(Store store) The intended semantic is to use this method to find the version that something is currently pushing to.double
getStorageEngineOverheadRatio
(String clusterName) getStorageNodes
(String clusterName) getStorageNodesStatus
(String clusterName, boolean enableReplica) getStorageNodesStatus
(String clusterName, String instanceId) Query and return the current status of the given storage node.getStoragePersona
(String clusterName, String name) Return a shared store config repository.getTopicManager
(String pubSubServerAddress) getValueSchema
(String clusterName, String storeName, int id) int
getValueSchemaId
(String clusterName, String storeName, String valueSchemaStr) getValueSchemas
(String clusterName, String storeName) org.apache.helix.zookeeper.impl.client.ZkClient
boolean
Test if the input store exists in a cluster.incrementVersionIdempotent
(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) Note: this currently use the pushID to guarantee idempotence, unexpected behavior may result if multiple batch jobs push to the same store at the same time.void
initiateDataRecovery
(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, boolean copyAllVersionConfigs, Optional<Version> sourceFabricVersion) Initiate data recovery for a store version given a source fabric.void
initStorageCluster
(String clusterName) Create and configure the Venice storage cluster with required properties in Helix and waits the resource's (partial) partition to appear in the external view.boolean
isActiveActiveReplicationEnabledInAllRegion
(String clusterName, String storeName, boolean checkCurrentVersion) Returns true if A/A replication is enabled in all child controller and parent controller.boolean
isAdminTopicConsumptionEnabled
(String clusterName) Return whether the admin consumption task is enabled for the passed cluster.boolean
isClusterValid
(String clusterName) Test if a cluster is valid (in Helix cluster list).boolean
isClusterWipeAllowed
(String clusterName) isInstanceRemovable
(String clusterName, String helixNodeId, List<String> lockedNodes) Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster.boolean
isLeaderControllerFor
(String clusterName) Check if this controller itself is the leader controller for a given cluster or not.boolean
This function is used to detect whether current node is the leader controller of controller cluster.boolean
isParent()
Check whether the controller works as a parent controllerboolean
isResourceStillAlive
(String resourceName) Test if a given helix resource is still alive (existent in ZK).boolean
isRTTopicDeletionPermittedByAllControllers
(String clusterName, String storeName) boolean
isSSLEnabledForPush
(String clusterName, String storeName) Return whether ssl is enabled for the given store for push.boolean
Test if ssl is enabled to Kafka.boolean
isStorageNodeNewerOrEqualTo
(String clusterName, String instanceId, StorageNodeStatus oldStatus) Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one.boolean
isStoreMigrationAllowed
(String clusterName) Test if the store migration is allowed for a cluster.isStoreVersionReadyForDataRecovery
(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Check if the store version's previous states and resources are cleaned up and ready to start data recovery.boolean
isTopicTruncated
(String kafkaTopicName) Check if a kafka topic is absent or truncated.boolean
isTopicTruncatedBasedOnRetention
(long retention) Test if retention is less than the configured DEPRECATED_TOPIC_MAX_RETENTION_MS value.boolean
isTopicTruncatedBasedOnRetention
(String kafkaTopicName, long retentionTime) Topic should also be considered to get cleaned up if: retention is less than the configured ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS value.void
killOfflinePush
(String clusterName, String kafkaTopic, boolean isForcedKill) Kill an offline push if it ran into errors or the corresponding version is being retired.listStorePushInfo
(String clusterName, String storeName, boolean isPartitionDetailEnabled) protected static HybridStoreConfig
mergeNewSettingsIntoOldHybridStoreConfig
(Store oldStore, Optional<Long> hybridRewindSeconds, Optional<Long> hybridOffsetLagThreshold, Optional<Long> hybridTimeLagThreshold, Optional<DataReplicationPolicy> hybridDataReplicationPolicy, Optional<BufferReplayPolicy> bufferReplayPolicy, Optional<String> realTimeTopicName) Used by both theVeniceHelixAdmin
and theVeniceParentHelixAdmin
void
migrateStore
(String srcClusterName, String destClusterName, String storeName) Main implementation for migrating a store from its source cluster to a new destination cluster.nodeReplicaReadiness
(String cluster, String helixNodeId) helixNodeId nodeId of helix participant.peekNextVersion
(String clusterName, String storeName) void
prepareDataRecovery
(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Prepare for data recovery in the destination fabric.removeDerivedSchema
(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId) Remove an existing derived schemavoid
removeInstanceFromAllowList
(String clusterName, String helixNodeId) Remove the given helix nodeId from the allowlist in ZK.void
removeStorageNode
(String clusterName, String instanceId) Remove one storage node from the given cluster.void
removeStoreFromGraveyard
(String clusterName, String storeName) void
replicateAddVersionAndStartIngestion
(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId) This method is invoked in parent controllers to replicate new version signals for migrating store.void
replicateUpdateStore
(String clusterName, String storeName, UpdateStoreQueryParams params) This method is invoked in parent controllers for store migration.void
retireOldStoreVersions
(String clusterName, String storeName, boolean deleteBackupOnStartPush, int currentVersionBeforePush) For a given store, determine its versions to delete based on the BackupStrategy settings and execute the deletion in the cluster (including all its resources).retrievePushStatus
(String clusterName, StoreInfo store) void
rollbackToBackupVersion
(String clusterName, String storeName, String regionFilter) Set backup version as current version in a child region.void
rollForwardToFutureVersion
(String clusterName, String storeName, String regionFilter) void
sendHeartbeatToSystemStore
(String clusterName, String storeName, long heartbeatTimeStamp) Send a heartbeat timestamp to targeted system store.void
sendKillMessageToParticipantStore
(String clusterName, String kafkaTopic) void
sendPushJobDetails
(PushJobStatusRecordKey key, PushJobDetails value) Lazy initialize a Venice writer for an internal real time topic store of push job details records.void
setAdminConsumerService
(String clusterName, AdminConsumerService service) void
void
setStoreConfigForMigration
(String storeName, String srcClusterName, String destClusterName) Update "migrationDestCluster" and "migrationSrcCluster" fields of the "/storeConfigs/storeName" znode.void
setStoreCurrentVersion
(String clusterName, String storeName, int versionNumber) Update the current version of a specified store.void
setStoreLargestUsedVersion
(String clusterName, String storeName, int versionNumber) Update the largest used version number of a specified store.void
setStoreOwner
(String clusterName, String storeName, String owner) Update the owner of a specified store.void
setStorePartitionCount
(String clusterName, String storeName, int partitionCount) Since partition check/calculation only happens when adding new store version,setStorePartitionCount(String, String, int)
would only change the number of partition for the following pushes.void
setStorePushStrategyForMigration
(String voldemortStoreName, String strategy) Unsupported operation in the child controller.void
setStoreReadability
(String clusterName, String storeName, boolean desiredReadability) Update the readability of a specified store.void
setStoreReadWriteability
(String clusterName, String storeName, boolean isAccessible) Update both readability and writability of a specified store.void
setStoreWriteability
(String clusterName, String storeName, boolean desiredWriteability) Update the writability of a specified store.void
skipAdminMessage
(String clusterName, long offset, boolean skipDIV) The admin consumption task tries to deal with failures to process an admin message by retrying.void
startInstanceMonitor
(String clusterName) void
Stop the helix controller for a single cluster.void
stopMonitorOfflinePush
(String clusterName, String topic, boolean deletePushStatus, boolean isForcedDelete) void
Stop the entire controller but not only the helix controller for a single cluster.void
storeMetadataUpdate
(String clusterName, String storeName, VeniceHelixAdmin.StoreMetadataOperation operation) Update the store metadata by applying provided operation.void
topicCleanupWhenPushComplete
(String clusterName, String storeName, int versionNumber) In this function, Controller will setup proper compaction strategy when the push job is full completed, and here are the reasons to set it up after the job completes: 1.boolean
truncateKafkaTopic
(String kafkaTopicName) We don't actually truncate any Kafka topic here; we just update the retention time.boolean
truncateKafkaTopic
(String topicName, long retentionTime) Truncate a Kafka topic by setting its retention time to the input value.void
updateAclForStore
(String clusterName, String storeName, String accessPermissions) Provision a new set of ACL for a venice store and its associated kafka topic.void
updateAdminTopicMetadata
(String clusterName, long executionId, Optional<String> storeName, Optional<Long> offset, Optional<Long> upstreamOffset) Update cluster-level execution id, offset and upstream offset.void
updateClusterConfig
(String clusterName, UpdateClusterConfigQueryParams params) Update the LiveClusterConfig at runtime for a specified cluster.void
updateClusterDiscovery
(String storeName, String oldCluster, String newCluster, String initiatingCluster) Update the cluster discovery of a given store by writing to the StoreConfig ZNode.void
updateRoutersClusterConfig
(String clusterName, Optional<Boolean> isThrottlingEnable, Optional<Boolean> isQuotaRebalancedEnable, Optional<Boolean> isMaxCapacityProtectionEnabled, Optional<Integer> expectedRouterCount) Update the cluster level for all routers.void
updateStoragePersona
(String clusterName, String name, UpdateStoragePersonaQueryParams queryParams) void
updateStore
(String clusterName, String storeName, UpdateStoreQueryParams params) TODO: some logics are in parent controllerVeniceParentHelixAdmin
#updateStore and some are in the child controller here.void
validateAndMaybeRetrySystemStoreAutoCreation
(String clusterName, String storeName, VeniceSystemStoreType systemStoreType) Check the creation results of a user store's system store.versionsForStore
(String clusterName, String storeName) boolean
whetherEnableBatchPushFromAdmin
(String storeName) Test if a store is allowed for a batch push.void
wipeCluster
(String clusterName, String fabric, Optional<String> storeName, Optional<Integer> versionNum) Delete stores from the cluster including both store data and metadata.void
writeEndOfPush
(String clusterName, String storeName, int versionNumber, boolean alsoWriteStartOfPush) Create a local Venice writer based on store version info and, for each partition, use the writer to send END_OF_PUSH and END_OF_SEGMENT control messages to Kafka.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.linkedin.venice.controller.Admin
addValueSchema, createStore, createStore, getDatacenterCount, hasWritePermissionToBatchJobHeartbeatStore, incrementVersionIdempotent, incrementVersionIdempotent
-
Field Details
-
INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS
protected static final int INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS- See Also:
-
INTERNAL_STORE_RTT_RETRY_BACKOFF_MS
protected static final long INTERNAL_STORE_RTT_RETRY_BACKOFF_MS -
pubSubTopicRepository
-
-
Constructor Details
-
VeniceHelixAdmin
public VeniceHelixAdmin(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, com.linkedin.d2.balancer.D2Client d2Client, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory) -
VeniceHelixAdmin
public VeniceHelixAdmin(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sslEnabled, @Nonnull com.linkedin.d2.balancer.D2Client d2Client, Optional<SSLConfig> sslConfig, Optional<DynamicAccessController> accessController, Optional<ICProvider> icProvider, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory, List<ClusterLeaderInitializationRoutine> additionalInitRoutines)
-
-
Method Details
-
startInstanceMonitor
- Specified by:
startInstanceMonitor
in interfaceAdmin
-
getLiveInstanceMonitor
-
clearInstanceMonitor
- Specified by:
clearInstanceMonitor
in interfaceAdmin
-
getZkClient
public org.apache.helix.zookeeper.impl.client.ZkClient getZkClient() -
getExecutionIdAccessor
-
getAdapterSerializer
-
initStorageCluster
Create and configure the Venice storage cluster with required properties in Helix and waits the resource's (partial) partition to appear in the external view.- Specified by:
initStorageCluster
in interfaceAdmin
- Parameters:
clusterName
- Venice cluster name.
-
isResourceStillAlive
Test if a given helix resource is still alive (existent in ZK).- Specified by:
isResourceStillAlive
in interfaceAdmin
- Parameters:
resourceName
- Helix resource name.- Returns:
true
if resource exists;false
otherwise.
-
isClusterValid
Test if a cluster is valid (in Helix cluster list).- Specified by:
isClusterValid
in interfaceAdmin
- Parameters:
clusterName
- Venice cluster name.- Returns:
true
if input cluster is in Helix cluster list;false
otherwise.
-
getHelixAdmin
protected org.apache.helix.HelixAdmin getHelixAdmin() -
createStore
public void createStore(String clusterName, String storeName, String owner, String keySchema, String valueSchema, boolean isSystemStore, Optional<String> accessPermissions) Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.- Specified by:
createStore
in interfaceAdmin
- Parameters:
clusterName
- Venice cluster where the store locates.storeName
- name of the store.owner
- owner of the store.keySchema
- key schema of the store.valueSchema
- value schema of the store.isSystemStore
- if the store is a system store.accessPermissions
- json string representing the access-permissions.
-
deleteStore
public void deleteStore(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) This method will delete store data, metadata, version and rt topics One exception is for stores with isMigrating flag set. In that case, the corresponding kafka topics and storeConfig will not be deleted so that they are still available for the cloned store.- Specified by:
deleteStore
in interfaceAdmin
-
sendPushJobDetails
Lazy initialize a Venice writer for an internal real time topic store of push job details records. Use this writer to put a pair of push job detail record (key
andvalue
).- Specified by:
sendPushJobDetails
in interfaceAdmin
- Parameters:
key
- key with which the specified value is to be associated.value
- value to be associated with the specified key.
-
getPushJobDetails
- Specified by:
getPushJobDetails
in interfaceAdmin
- Returns:
- the value to which the specified key is mapped from the Venice internal real time topic store.
-
getBatchJobHeartbeatValue
public BatchJobHeartbeatValue getBatchJobHeartbeatValue(@Nonnull BatchJobHeartbeatKey batchJobHeartbeatKey) - Specified by:
getBatchJobHeartbeatValue
in interfaceAdmin
- Returns:
- the value to which the specified key is mapped from the Venice internal
BATCH_JOB_HEARTBEAT_STORE
topic store.
-
writeEndOfPush
public void writeEndOfPush(String clusterName, String storeName, int versionNumber, boolean alsoWriteStartOfPush) Create a local Venice writer based on store version info and, for each partition, use the writer to send END_OF_PUSH and END_OF_SEGMENT control messages to Kafka.- Specified by:
writeEndOfPush
in interfaceAdmin
- Parameters:
clusterName
- name of the Venice cluster.storeName
- name of the store.versionNumber
- store version number.alsoWriteStartOfPush
- if Venice writer sends a START_OF_PUSH control message first.
-
whetherEnableBatchPushFromAdmin
Test if a store is allowed for a batch push.- Specified by:
whetherEnableBatchPushFromAdmin
in interfaceAdmin
- Parameters:
storeName
- name of a store.- Returns:
true
is the store is a participant system store or if Venice is running in single-region mode
-
isStoreMigrationAllowed
Test if the store migration is allowed for a cluster. It reads the value "allow.store.migration" from the"/clusterName/ClusterConfig"
znode.- Specified by:
isStoreMigrationAllowed
in interfaceAdmin
- Parameters:
clusterName
- name of Venice cluster.- Returns:
true
if store migration is allowed for the input cluster;false
otherwise.
-
migrateStore
Main implementation for migrating a store from its source cluster to a new destination cluster. A new store (with same properties, e.g. name, owner, key schema, value schema) is created at the destination cluster and its StoreInfo is also cloned. For a store with enabled meta system store or enabled davinci push status, those system stores are also migrated. Different store versions are evaluated for the migration. For those versions to be migrated, it triggers the ADD_VERSION and starts ingestion at the destination cluster.- Specified by:
migrateStore
in interfaceAdmin
- Parameters:
srcClusterName
- name of the source cluster.destClusterName
- name of the destination cluster.storeName
- name of the target store.
-
clearIngestionKillMessageAndVerify
Clear KILL messages from a participant system store. -
getControllerClientMap
-
completeMigration
- Specified by:
completeMigration
in interfaceAdmin
- See Also:
-
abortMigration
Abort store migration by resetting migration flag at the source cluster, resetting storeConfig, and updating "cluster" in "/storeConfigs" znode back to the source cluster.- Specified by:
abortMigration
in interfaceAdmin
- Parameters:
srcClusterName
- name of the source cluster.destClusterName
- name of the destination cluster.storeName
- name of the store in migration.
-
updateClusterDiscovery
public void updateClusterDiscovery(String storeName, String oldCluster, String newCluster, String initiatingCluster) Description copied from interface:Admin
Update the cluster discovery of a given store by writing to the StoreConfig ZNode.- Specified by:
updateClusterDiscovery
in interfaceAdmin
- Parameters:
storeName
- of the store.oldCluster
- for the store.newCluster
- for the store.initiatingCluster
- that is making the update. This is needed because in the case of store migration sometimes the update is not made by the leader of the current cluster but instead the leader of the source cluster.- See Also:
-
checkPreConditionForCreateStore
protected void checkPreConditionForCreateStore(String clusterName, String storeName, String keySchema, String valueSchema, boolean allowSystemStore, boolean skipLingeringResourceCheck) Check whether Controller should block the incoming store creation. Inside this function, there is a logic to check whether there are any lingering resources since the requested store could be just deleted recently. This check should be skipped in Child Controller, but only enabled in Parent Controller because of the following reasons: 1. Parent Controller has the strict order that the system store must be created before the host Venice store. 2. Child Controller doesn't have this strict order since the admin messages of Child Controller could be executed in parallel since they are different store names. So when such kind of race condition happens, it will cause a dead loop: a. The version creation of system store will create a RT topic in Parent Cluster. b. The RT topic will be mirrored by KMM to the Child Cluster. c. The version creation admin message of system store will be blocked in Child Controller since the host Venice store doesn't exist. d. The store creation admin message of the host Venice store will be blocked in Child Controller because of lingering resource check (RT topic of its system store already exists, which is created by KMM). TODO: Evaluate if this code needs to change now that KMM has been deprecated. In the future, once Venice gets rid of KMM, the topic won't be automatically created by KMM, and this race condition will be addressed. So far, Child Controller will skip lingering resource check when handling store creation admin message. -
addVersionAndStartIngestion
public void addVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion) Description copied from interface:Admin
This method behaves differently inVeniceHelixAdmin
andVeniceParentHelixAdmin
.- Specified by:
addVersionAndStartIngestion
in interfaceAdmin
-
addVersionAndStartIngestion
public void addVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add version message. -
replicateAddVersionAndStartIngestion
public void replicateAddVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId) This method is invoked in parent controllers to replicate new version signals for migrating store. -
addVersionAndTopicOnly
public Pair<Boolean,Version> addVersionAndTopicOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, String compressionDictionary, String remoteKafkaBootstrapServers, Optional<String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, Optional<String> emergencySourceRegion, boolean versionSwapDeferred) -
addVersionAndTopicOnly
public Pair<Boolean,Version> addVersionAndTopicOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, String compressionDictionary, String remoteKafkaBootstrapServers, Optional<String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) A wrapper to invoke VeniceHelixAdmin#addVersion to only increment the version and create the topic(s) needed without starting ingestion. -
addVersionOnly
public Version addVersionOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId) Only add version to the store without creating the topic or start ingestion. Used to sync version metadata in the parent fabric during store migration. -
addSpecificVersion
TODO refactor addVersion to these broken down methods instead of doing everything in one giant method. Perform add version to a given store with the providedVersion
-
createSpecificVersionTopic
Create the corresponding version topic based on the providedVersion
-
createHelixResourceAndStartMonitoring
public void createHelixResourceAndStartMonitoring(String clusterName, String storeName, Version version) Create Helix-resources for a given storage node cluster and starts monitoring a new push. -
incrementVersionIdempotent
public Version incrementVersionIdempotent(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) Note: this currently use the pushID to guarantee idempotence, unexpected behavior may result if multiple batch jobs push to the same store at the same time.- Specified by:
incrementVersionIdempotent
in interfaceAdmin
-
getStartedVersion
The intended semantic is to use this method to find the version that something is currently pushing to. It looks at all versions greater than the current version and identifies the version with a status of STARTED. If there is no STARTED version, it creates a new one for the push to use. This means we cannot use this method to support multiple concurrent pushes.- Parameters:
store
-- Returns:
- the started version if there is only one, throws an exception if there is an error version with a greater number than the current version. Otherwise returns Optional.empty()
-
getReplicationMetadataSchema
public Optional<org.apache.avro.Schema> getReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaID, int rmdVersionID) - Specified by:
getReplicationMetadataSchema
in interfaceAdmin
- Returns:
- replication metadata schema for a store in a cluster with specified schema ID and RMD protocol version ID.
-
getReferenceVersionForStreamingWrites
public Version getReferenceVersionForStreamingWrites(String clusterName, String storeName, String pushJobId) - Specified by:
getReferenceVersionForStreamingWrites
in interfaceAdmin
-
getIncrementalPushVersion
- Specified by:
getIncrementalPushVersion
in interfaceAdmin
-
getCurrentVersion
- Specified by:
getCurrentVersion
in interfaceAdmin
- Returns:
- The current version number of an input store in the specified Venice cluster or Store.NON_EXISTING_VERSION if none exists.
-
getFutureVersion
- Specified by:
getFutureVersion
in interfaceAdmin
- Returns:
- Returns the online (completed, but not yet swapped) or future version with ongoing ingestion else if none exists returns Store.NON_EXISTING_VERSION
-
getBackupVersion
- Specified by:
getBackupVersion
in interfaceAdmin
-
getOnlineFutureVersion
-
getCurrentVersionsForMultiColos
- Specified by:
getCurrentVersionsForMultiColos
in interfaceAdmin
-
getRepushInfo
- Specified by:
getRepushInfo
in interfaceAdmin
- Returns:
- a new RepushInfo object with specified store info.
-
getFutureVersionsForMultiColos
- Specified by:
getFutureVersionsForMultiColos
in interfaceAdmin
- See Also:
-
getBackupVersionsForMultiColos
- Specified by:
getBackupVersionsForMultiColos
in interfaceAdmin
-
peekNextVersion
- Specified by:
peekNextVersion
in interfaceAdmin
- Returns:
- the next version without adding the new version to the store.
-
deleteAllVersionsInStore
Description copied from interface:Admin
Delete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).- Specified by:
deleteAllVersionsInStore
in interfaceAdmin
- See Also:
-
deleteOldVersionInStore
Description copied from interface:Admin
Delete the given version from the store. If the given version is the current version, an exception will be thrown.- Specified by:
deleteOldVersionInStore
in interfaceAdmin
- See Also:
-
deleteOneStoreVersion
Delete version from cluster, removing all related resources- Specified by:
deleteOneStoreVersion
in interfaceStoreCleaner
-
isRTTopicDeletionPermittedByAllControllers
- Specified by:
isRTTopicDeletionPermittedByAllControllers
in interfaceAdmin
-
retireOldStoreVersions
public void retireOldStoreVersions(String clusterName, String storeName, boolean deleteBackupOnStartPush, int currentVersionBeforePush) For a given store, determine its versions to delete based on the BackupStrategy settings and execute the deletion in the cluster (including all its resources). It also truncates Kafka topics and Helix resources.- Specified by:
retireOldStoreVersions
in interfaceStoreCleaner
- Parameters:
clusterName
- name of a cluster.storeName
- name of the store to retire.deleteBackupOnStartPush
- indicate if it is called in a start-of-push workflow.currentVersionBeforePush
- current version before a new push.
-
topicCleanupWhenPushComplete
In this function, Controller will setup proper compaction strategy when the push job is full completed, and here are the reasons to set it up after the job completes: 1. For batch push jobs to batch-only store, there is no impact. There could still be duplicate entries because of speculative executions in map-reduce job, but we are not planning to clean them up now. 2. For batch push jobs to hybrid/incremental stores, if the compaction is enabled at the beginning of the job, Kafka compaction could kick in during push job, and storage node could detect DIV error, such as missing messages, checksum mismatch, because speculative execution could produce duplicate entries, and we don't want to fail the push in this scenario and we still want to perform the strong DIV validation in batch push, so we could only enable compaction after the batch push completes. 3. For GF jobs to hybrid store, it is similar as #2, and it contains duplicate entries because there is no de-dedup happening anywhere. With this way, when load rebalance happens for hybrid/incremental stores, DIV error could be detected during ingestion at any phase since compaction might be enabled long-time ago. So in storage node, we need to add one more safeguard before throwing the DIV exception to check whether the topic is compaction-enabled or not. Since Venice is not going to change the compaction policy between non-compact and compact back and forth, checking whether topic is compaction-enabled or not when encountering DIV error should be good enough.- Specified by:
topicCleanupWhenPushComplete
in interfaceStoreCleaner
-
isTopicTruncated
Check if a kafka topic is absent or truncated.- Specified by:
isTopicTruncated
in interfaceAdmin
- See Also:
-
isTopicTruncatedBasedOnRetention
public boolean isTopicTruncatedBasedOnRetention(long retention) Test if retention is less than the configured DEPRECATED_TOPIC_MAX_RETENTION_MS value.- Specified by:
isTopicTruncatedBasedOnRetention
in interfaceAdmin
- Returns:
true
if the specified retention is below the configuration;false otherwise.
- See Also:
-
isTopicTruncatedBasedOnRetention
Topic should also be considered to get cleaned up if: retention is less than the configured ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS value. or 1. Topic retention equals fatalDataValidationFailureRetentionMs. 2. Topic is a version topic. 3. fatalDataValidationFailureRetentionMs has already passed since its creation.- Specified by:
isTopicTruncatedBasedOnRetention
in interfaceAdmin
-
getMinNumberOfUnusedKafkaTopicsToPreserve
public int getMinNumberOfUnusedKafkaTopicsToPreserve()- Specified by:
getMinNumberOfUnusedKafkaTopicsToPreserve
in interfaceAdmin
- Returns:
- the controller configuration value for MIN_NUMBER_OF_UNUSED_KAFKA_TOPICS_TO_PRESERVE.
- See Also:
-
truncateKafkaTopic
We don't actually truncate any Kafka topic here; we just update the retention time.- Specified by:
truncateKafkaTopic
in interfaceAdmin
- Parameters:
kafkaTopicName
-- Returns:
-
truncateKafkaTopic
Description copied from interface:Admin
Truncate a Kafka topic by setting its retention time to the input value.- Specified by:
truncateKafkaTopic
in interfaceAdmin
- Parameters:
topicName
- the name of the topic to truncate.retentionTime
- the retention time in milliseconds to set for the topic.- Returns:
- true if truncating this topic successfully. false otherwise.
- See Also:
-
versionsForStore
- Specified by:
versionsForStore
in interfaceAdmin
- Returns:
- all versions of the specified store from a cluster.
-
getAllStores
- Specified by:
getAllStores
in interfaceAdmin
- Returns:
- all stores in the specified cluster.
-
getAllStoreStatuses
Description copied from interface:Admin
Get the statuses of all stores. The store status is decided by the current version. For example, if one partition only have 2 ONLINE replicas in the current version, we say this store is under replicated. Refer toStoreStatus
for the definition of each status.- Specified by:
getAllStoreStatuses
in interfaceAdmin
- Returns:
- a map whose key is store name and value is store's status.
- See Also:
-
hasStore
Test if the input store exists in a cluster. -
getStore
-
setStoreCurrentVersion
Update the current version of a specified store.- Specified by:
setStoreCurrentVersion
in interfaceAdmin
-
rollForwardToFutureVersion
- Specified by:
rollForwardToFutureVersion
in interfaceAdmin
-
rollbackToBackupVersion
Set backup version as current version in a child region.- Specified by:
rollbackToBackupVersion
in interfaceAdmin
-
getBackupVersionNumber
Get backup version number, the largest online version number that is less than the current version number -
setStoreLargestUsedVersion
Update the largest used version number of a specified store.- Specified by:
setStoreLargestUsedVersion
in interfaceAdmin
-
setStoreOwner
Update the owner of a specified store.- Specified by:
setStoreOwner
in interfaceAdmin
-
setStorePartitionCount
Since partition check/calculation only happens when adding new store version,setStorePartitionCount(String, String, int)
would only change the number of partition for the following pushes. Current version would not be changed.- Specified by:
setStorePartitionCount
in interfaceAdmin
-
setStoreWriteability
Update the writability of a specified store.- Specified by:
setStoreWriteability
in interfaceAdmin
-
setStoreReadability
Update the readability of a specified store.- Specified by:
setStoreReadability
in interfaceAdmin
-
setStoreReadWriteability
Update both readability and writability of a specified store.- Specified by:
setStoreReadWriteability
in interfaceAdmin
-
getMetaStoreValue
- Specified by:
getMetaStoreValue
in interfaceAdmin
-
getInUseValueSchemaIds
- Specified by:
getInUseValueSchemaIds
in interfaceAdmin
-
deleteValueSchemas
public void deleteValueSchemas(String clusterName, String storeName, Set<Integer> unusedValueSchemaIds) Description copied from interface:Admin
Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIds- Specified by:
deleteValueSchemas
in interfaceAdmin
-
updateStore
TODO: some logics are in parent controllerVeniceParentHelixAdmin
#updateStore and some are in the child controller here. Need to unify them in the future.- Specified by:
updateStore
in interfaceAdmin
-
updateClusterConfig
Update the LiveClusterConfig at runtime for a specified cluster.- Specified by:
updateClusterConfig
in interfaceAdmin
- Parameters:
clusterName
- name of the Venice cluster.params
- parameters to update.
-
replicateUpdateStore
public void replicateUpdateStore(String clusterName, String storeName, UpdateStoreQueryParams params) This method is invoked in parent controllers for store migration. -
mergeNewSettingsIntoOldHybridStoreConfig
protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig(Store oldStore, Optional<Long> hybridRewindSeconds, Optional<Long> hybridOffsetLagThreshold, Optional<Long> hybridTimeLagThreshold, Optional<DataReplicationPolicy> hybridDataReplicationPolicy, Optional<BufferReplayPolicy> bufferReplayPolicy, Optional<String> realTimeTopicName) Used by both theVeniceHelixAdmin
and theVeniceParentHelixAdmin
- Parameters:
oldStore
- Existing Store that is the source for updates. This object will not be modified by this method.hybridRewindSeconds
- Optional is present if the returned object should include a new rewind timehybridOffsetLagThreshold
- Optional is present if the returned object should include a new offset lag threshold- Returns:
- null if oldStore has no hybrid configs and optionals are not present,
otherwise a fully specified
HybridStoreConfig
-
storeMetadataUpdate
public void storeMetadataUpdate(String clusterName, String storeName, VeniceHelixAdmin.StoreMetadataOperation operation) Update the store metadata by applying provided operation.- Parameters:
clusterName
- name of the cluster.storeName
- name of the to be updated store.operation
- the defined operation that update the store.
-
getStorageEngineOverheadRatio
- Specified by:
getStorageEngineOverheadRatio
in interfaceAdmin
- Returns:
- the configuration value for ConfigKeys.STORAGE_ENGINE_OVERHEAD_RATIO
-
containsHelixResource
Description copied from interface:StoreCleaner
This purpose of the function is to check if the given resource exists in the Helix cluster.- Specified by:
containsHelixResource
in interfaceStoreCleaner
- Parameters:
clusterName
- The Venice cluster that the resource belongs to.kafkaTopic
- it's usually the store version name (version topic name).- Returns:
-
deleteHelixResource
Description copied from interface:StoreCleaner
This purpose of the function is to delete the given resource from the Helix cluster. Different fromStoreCleaner.deleteOneStoreVersion(String, String, int)
, this function will not check whether the store version is still a valid version inside Venice backend, and it will send the delete request to Helix cluster directly. Do enough sanity check before calling this function.- Specified by:
deleteHelixResource
in interfaceStoreCleaner
- Parameters:
clusterName
- The Venice cluster that the resource belongs to.kafkaTopic
- It's usually the store version name (version topic name).
-
enableDisabledPartition
-
getKeySchema
- Specified by:
getKeySchema
in interfaceAdmin
- Returns:
- the key schema for the specified store.
-
getValueSchemas
- Specified by:
getValueSchemas
in interfaceAdmin
- Returns:
- the value schema for the specified store.
-
getDerivedSchemas
- Specified by:
getDerivedSchemas
in interfaceAdmin
- Returns:
- the derived schema for the specified store.
-
getValueSchemaId
- Specified by:
getValueSchemaId
in interfaceAdmin
- Returns:
- the schema id for the specified store and value schema.
-
getDerivedSchemaId
- Specified by:
getDerivedSchemaId
in interfaceAdmin
- Returns:
- the derived schema id for the specified store and derived schema.
-
getValueSchema
- Specified by:
getValueSchema
in interfaceAdmin
- Returns:
- the derived schema for the specified store and id.
-
addValueSchema
public SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) - Specified by:
addValueSchema
in interfaceAdmin
- See Also:
-
addValueSchema
public SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType compatibilityType) Add a new value schema for the given store with all specified properties and return a new SchemaEntry object containing the schema and its id.- Specified by:
addValueSchema
in interfaceAdmin
- Returns:
- an
SchemaEntry
object composed of a schema and its corresponding id.
-
addDerivedSchema
public DerivedSchemaEntry addDerivedSchema(String clusterName, String storeName, int valueSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties and return a newDerivedSchemaEntry
object containing the schema and its id.- Specified by:
addDerivedSchema
in interfaceAdmin
- Returns:
- an
DerivedSchemaEntry
object composed of specified properties.
-
addDerivedSchema
public DerivedSchemaEntry addDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties.- Specified by:
addDerivedSchema
in interfaceAdmin
- Returns:
- an
DerivedSchemaEntry
object composed of specified properties.
-
removeDerivedSchema
public DerivedSchemaEntry removeDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId) Description copied from interface:Admin
Remove an existing derived schema- Specified by:
removeDerivedSchema
in interfaceAdmin
- Returns:
- the derived schema that is deleted or null if the schema doesn't exist
- See Also:
-
addSupersetSchema
public SchemaEntry addSupersetSchema(String clusterName, String storeName, String valueSchema, int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) Add a new superset schema for the given store with all specified properties.Generate the superset schema off the current schema and latest superset schema (if any, if not pick the latest value schema) existing in the store. If the newly generated superset schema is unique add it to the store and update latestSuperSetValueSchemaId of the store.
- Specified by:
addSupersetSchema
in interfaceAdmin
-
getReplicationMetadataSchemas
public Collection<RmdSchemaEntry> getReplicationMetadataSchemas(String clusterName, String storeName) - Specified by:
getReplicationMetadataSchemas
in interfaceAdmin
- Returns:
- a collection of
ReplicationMetadataSchemaEntry
object for the given store and cluster.
-
addReplicationMetadataSchema
public RmdSchemaEntry addReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaId, int replicationMetadataVersionId, String replicationMetadataSchemaStr) Create a newReplicationMetadataSchemaEntry
object with the given properties and add it into schema repository if no duplication.- Specified by:
addReplicationMetadataSchema
in interfaceAdmin
- Returns:
ReplicationMetadataSchemaEntry
object reference.
-
validateAndMaybeRetrySystemStoreAutoCreation
public void validateAndMaybeRetrySystemStoreAutoCreation(String clusterName, String storeName, VeniceSystemStoreType systemStoreType) Check the creation results of a user store's system store. If the system store's current version is in error state, re-issue a new empty push and waits for the empty push to complete.- Specified by:
validateAndMaybeRetrySystemStoreAutoCreation
in interfaceAdmin
-
getStorageNodes
- Specified by:
getStorageNodes
in interfaceAdmin
- Returns:
- a list of storage node instance names for a given cluster.
-
getHelixAdminClient
-
getDisabledPartitionStats
-
getStorageNodesStatus
- Specified by:
getStorageNodesStatus
in interfaceAdmin
- Returns:
- a map containing the storage node name and its connectivity status (
InstanceStatus
).
-
removeStorageNode
Remove one storage node from the given cluster.It removes the given helix nodeId from the allowlist in ZK and its associated resource in Helix.
- Specified by:
removeStorageNode
in interfaceAdmin
-
stop
Description copied from interface:Admin
Stop the helix controller for a single cluster. -
stopVeniceController
public void stopVeniceController()Description copied from interface:Admin
Stop the entire controller but not only the helix controller for a single cluster.- Specified by:
stopVeniceController
in interfaceAdmin
- See Also:
-
getOffLinePushStatus
Description copied from interface:Admin
Query the status of the offline push by given kafka topic. TODO We use kafka topic to tracking the status now but in the further we should use jobId instead of kafka TODO topic. Right now each kafka topic only have one offline job. But in the further one kafka topic could be TODO assigned multiple jobs like data migration job etc.- Specified by:
getOffLinePushStatus
in interfaceAdmin
- Returns:
- the status of current offline push for the passed kafka topic
- See Also:
-
getOffLinePushStatus
public Admin.OfflinePushStatusInfo getOffLinePushStatus(String clusterName, String kafkaTopic, Optional<String> incrementalPushVersion, String region, String targetedRegions) - Specified by:
getOffLinePushStatus
in interfaceAdmin
-
getOverallPushStatus
protected static ExecutionStatus getOverallPushStatus(ExecutionStatus veniceStatus, ExecutionStatus daVinciStatus) -
getKafkaBootstrapServers
Description copied from interface:Admin
Return the ssl or non-ssl bootstrap servers based on the given flag.- Specified by:
getKafkaBootstrapServers
in interfaceAdmin
- Returns:
- kafka bootstrap servers url, if there are multiple will be comma separated.
- See Also:
-
getRegionName
Description copied from interface:Admin
Return the region name of this Admin- Specified by:
getRegionName
in interfaceAdmin
- Returns:
- the region name of this controller
-
getNativeReplicationKafkaBootstrapServerAddress
- Specified by:
getNativeReplicationKafkaBootstrapServerAddress
in interfaceAdmin
- Returns:
- KafkaUrl for the given fabric.
- See Also:
-
getNativeReplicationSourceFabric
public String getNativeReplicationSourceFabric(String clusterName, Store store, Optional<String> sourceGridFabric, Optional<String> emergencySourceRegion, String targetedRegions) Source fabric selection priority: 1. Parent controller emergency source fabric config. 2. VPJ plugin targeted region config, however it will compute all selections based on the criteria below to select the source region. 3. VPJ plugin source grid fabric config. 4. Store level source fabric config. 5. Cluster level source fabric config.- Specified by:
getNativeReplicationSourceFabric
in interfaceAdmin
- Returns:
- the selected source fabric for a given store.
-
isSSLEnabledForPush
Description copied from interface:Admin
Return whether ssl is enabled for the given store for push.- Specified by:
isSSLEnabledForPush
in interfaceAdmin
- See Also:
-
isSslToKafka
public boolean isSslToKafka()Test if ssl is enabled to Kafka.- Specified by:
isSslToKafka
in interfaceAdmin
- See Also:
-
getTopicManager
- Specified by:
getTopicManager
in interfaceAdmin
- See Also:
-
getTopicManager
- Specified by:
getTopicManager
in interfaceAdmin
- See Also:
-
isLeaderControllerFor
Description copied from interface:Admin
Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be either a parent controller or a child controller since a cluster must have a leader child controller and a leader parent controller. The point is not to be confused the concept of leader-standby with parent-child controller architecture.- Specified by:
isLeaderControllerFor
in interfaceAdmin
- See Also:
-
getAggregatedHealthStatus
public InstanceRemovableStatuses getAggregatedHealthStatus(String cluster, List<String> instances, List<String> toBeStoppedInstances, boolean isSSLEnabled) - Specified by:
getAggregatedHealthStatus
in interfaceAdmin
-
calculateNumberOfPartitions
Calculate number of partition for given store.- Specified by:
calculateNumberOfPartitions
in interfaceAdmin
-
getReplicationFactor
- Specified by:
getReplicationFactor
in interfaceAdmin
- Returns:
- the replication factor of the given store.
-
getReplicas
- Specified by:
getReplicas
in interfaceAdmin
- Returns:
- a list of
Replica
created for the given resource.
-
getReplicasOfStorageNode
- Specified by:
getReplicasOfStorageNode
in interfaceAdmin
- See Also:
-
isInstanceRemovable
public NodeRemovableResult isInstanceRemovable(String clusterName, String helixNodeId, List<String> lockedNodes) Description copied from interface:Admin
Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster. For example, if there is only one online replica alive in this cluster which is hosted on the given instance. This instance should not be removed out of cluster, otherwise Venice will lose data. For detail criteria please refer toInstanceStatusDecider
- Specified by:
isInstanceRemovable
in interfaceAdmin
- Parameters:
clusterName
- The cluster were the hosts belong.helixNodeId
- nodeId of helix participant. HOST_PORT.lockedNodes
- A list of helix nodeIds whose resources are assumed to be unusable (stopped).- See Also:
-
getLeaderController
Description copied from interface:Admin
Get instance of leader controller. If there is no leader controller for the given cluster, throw a VeniceException.- Specified by:
getLeaderController
in interfaceAdmin
- See Also:
-
addInstanceToAllowlist
Add the given helix nodeId into the allowlist in ZK.- Specified by:
addInstanceToAllowlist
in interfaceAdmin
-
removeInstanceFromAllowList
Remove the given helix nodeId from the allowlist in ZK.- Specified by:
removeInstanceFromAllowList
in interfaceAdmin
-
getAllowlist
- Specified by:
getAllowlist
in interfaceAdmin
- Returns:
- a list of all helix nodeIds in the allowlist for the given cluster from ZK.
-
killOfflinePush
Description copied from interface:Admin
Kill an offline push if it ran into errors or the corresponding version is being retired.- Specified by:
killOfflinePush
in interfaceAdmin
isForcedKill
- should be set to true when killing the push job for retiring the corresponding version.- See Also:
-
deleteParticipantStoreKillMessage
Compose aParticipantMessageKey
message and execute a delete operation on the key to the cluster's participant store. -
sendKillMessageToParticipantStore
-
getStorageNodesStatus
Description copied from interface:Admin
Query and return the current status of the given storage node. The "storage node status" is composed by "status" of all replicas in that storage node. "status" is an integer value of Helix state:- DROPPED=1
- ERROR=2
- OFFLINE=3
- BOOTSTRAP=4
- ONLINE=5
- Specified by:
getStorageNodesStatus
in interfaceAdmin
- See Also:
-
isStorageNodeNewerOrEqualTo
public boolean isStorageNodeNewerOrEqualTo(String clusterName, String instanceId, StorageNodeStatus oldStatus) Description copied from interface:Admin
Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one. Compare will go through each of replica in this storage node, if all their statuses values were larger or equal than the statuses value in the given storage node status, We say current storage node status is "Newer" or "Equal " to the given one.- Specified by:
isStorageNodeNewerOrEqualTo
in interfaceAdmin
- See Also:
-
setAdminConsumerService
- Specified by:
setAdminConsumerService
in interfaceAdmin
- See Also:
-
skipAdminMessage
Description copied from interface:Admin
The admin consumption task tries to deal with failures to process an admin message by retrying. If there is a message that cannot be processed for some reason, we will need to forcibly skip that message in order to unblock the task from consuming subsequent messages.- Specified by:
skipAdminMessage
in interfaceAdmin
skipDIV
- tries to skip only the DIV check for the blocking message.- See Also:
-
getLastSucceedExecutionId
Description copied from interface:Admin
Get the id of the last succeed execution in this controller.- Specified by:
getLastSucceedExecutionId
in interfaceAdmin
- See Also:
-
getLastSucceededExecutionId
Get last succeeded execution id for a given store; if storeName is null, return the last succeeded execution id for the cluster- Parameters:
clusterName
-storeName
-- Returns:
- the last succeeded execution id or null if the cluster/store is invalid or the admin consumer service for the given cluster is not up and running yet.
-
getAdminCommandExecutionTracker
Description copied from interface:Admin
Get the tracker used to track the execution of the admin command for the given cluster.- Specified by:
getAdminCommandExecutionTracker
in interfaceAdmin
- See Also:
-
getAdminTopicMetadata
- Specified by:
getAdminTopicMetadata
in interfaceAdmin
- Returns:
- cluster-level execution id, offset and upstream offset. If store name is specified, it returns store-level execution id.
-
updateAdminTopicMetadata
public void updateAdminTopicMetadata(String clusterName, long executionId, Optional<String> storeName, Optional<Long> offset, Optional<Long> upstreamOffset) Update cluster-level execution id, offset and upstream offset. If store name is specified, it updates the store-level execution id.- Specified by:
updateAdminTopicMetadata
in interfaceAdmin
-
getRoutersClusterConfig
Description copied from interface:Admin
Get the cluster level config for all routers.- Specified by:
getRoutersClusterConfig
in interfaceAdmin
- See Also:
-
updateRoutersClusterConfig
public void updateRoutersClusterConfig(String clusterName, Optional<Boolean> isThrottlingEnable, Optional<Boolean> isQuotaRebalancedEnable, Optional<Boolean> isMaxCapacityProtectionEnabled, Optional<Integer> expectedRouterCount) Description copied from interface:Admin
Update the cluster level for all routers.- Specified by:
updateRoutersClusterConfig
in interfaceAdmin
- See Also:
-
getAllStorePushStrategyForMigration
Unsupported operation in the child controller.- Specified by:
getAllStorePushStrategyForMigration
in interfaceAdmin
-
setStorePushStrategyForMigration
Unsupported operation in the child controller.- Specified by:
setStorePushStrategyForMigration
in interfaceAdmin
-
discoverCluster
Description copied from interface:Admin
Find the cluster which the given store belongs to. Return the pair of the cluster name and the d2 service associated with that cluster.- Specified by:
discoverCluster
in interfaceAdmin
- See Also:
-
getServerD2Service
Description copied from interface:Admin
Find the server d2 service associated with a given cluster name.- Specified by:
getServerD2Service
in interfaceAdmin
- See Also:
-
findAllBootstrappingVersions
Description copied from interface:Admin
Find the store versions which have at least one bootstrap replica.- Specified by:
findAllBootstrappingVersions
in interfaceAdmin
- See Also:
-
getVeniceWriterFactory
- Specified by:
getVeniceWriterFactory
in interfaceAdmin
- Returns:
- a
VeniceWriterFactory
object used by the Venice controller to create the venice writer.
-
getPubSubConsumerAdapterFactory
- Specified by:
getPubSubConsumerAdapterFactory
in interfaceAdmin
- Returns:
- a
PubSubClientFactory
object used by the Venice controller to create Pubsub clients.
-
getPubSubSSLProperties
- Specified by:
getPubSubSSLProperties
in interfaceAdmin
-
stopMonitorOfflinePush
-
close
public void close()Cause VeniceHelixAdmin and its associated services to stop executing. -
getHelixVeniceClusterResources
- Specified by:
getHelixVeniceClusterResources
in interfaceAdmin
- Returns:
- the aggregate resources required by controller to manage a Venice cluster.
-
getStoreConfigRepo
Description copied from interface:Admin
Return a shared store config repository.- Specified by:
getStoreConfigRepo
in interfaceAdmin
-
isLeaderControllerOfControllerCluster
public boolean isLeaderControllerOfControllerCluster()This function is used to detect whether current node is the leader controller of controller cluster. Be careful to use this function since it will talk to Zookeeper directly every time.- Specified by:
isLeaderControllerOfControllerCluster
in interfaceAdmin
- Returns:
-
setStoreConfigForMigration
public void setStoreConfigForMigration(String storeName, String srcClusterName, String destClusterName) Update "migrationDestCluster" and "migrationSrcCluster" fields of the "/storeConfigs/storeName" znode.- Parameters:
storeName
- name of the store.srcClusterName
- name of the source cluster.destClusterName
- name of the destination cluster.
-
updateAclForStore
Description copied from interface:Admin
Provision a new set of ACL for a venice store and its associated kafka topic.- Specified by:
updateAclForStore
in interfaceAdmin
- See Also:
-
getAclForStore
Description copied from interface:Admin
Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.- Specified by:
getAclForStore
in interfaceAdmin
- Returns:
- The string representation of the accessPermissions. It will return empty string in case store is not present.
- See Also:
-
deleteAclForStore
Description copied from interface:Admin
Delete the current set of ACL provisioned for a venice store and its associated kafka topic.- Specified by:
deleteAclForStore
in interfaceAdmin
- See Also:
-
configureActiveActiveReplication
public void configureActiveActiveReplication(String clusterName, VeniceUserStoreType storeType, Optional<String> storeName, boolean enableActiveActiveReplicationForCluster, Optional<String> regionsFilter) Description copied from interface:Admin
Enable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster. If storeName is not empty, only the specified store might be updated.- Specified by:
configureActiveActiveReplication
in interfaceAdmin
- See Also:
-
getClusterStores
Description copied from interface:Admin
Return all stores in a cluster.- Specified by:
getClusterStores
in interfaceAdmin
- Returns:
- a list of
StoreInfo
of all stores in the specified cluster.
-
getClusterStaleStores
- Specified by:
getClusterStaleStores
in interfaceAdmin
-
listStorePushInfo
public Map<String,RegionPushDetails> listStorePushInfo(String clusterName, String storeName, boolean isPartitionDetailEnabled) - Specified by:
listStorePushInfo
in interfaceAdmin
-
getRegionPushDetails
public RegionPushDetails getRegionPushDetails(String clusterName, String storeName, boolean isPartitionDetailAdded) - Specified by:
getRegionPushDetails
in interfaceAdmin
- Returns:
RegionPushDetails
object containing the specified store's push status.
-
retrievePushStatus
-
checkResourceCleanupBeforeStoreCreation
Description copied from interface:Admin
Check whether there are any resource left for the store creation in cluster: If there is any, this function should throw Exception.- Specified by:
checkResourceCleanupBeforeStoreCreation
in interfaceAdmin
- See Also:
-
wipeCluster
public void wipeCluster(String clusterName, String fabric, Optional<String> storeName, Optional<Integer> versionNum) Delete stores from the cluster including both store data and metadata.The API provides the flexibility to delete a single store or a single version. Cluster name and fabric are required parameters, but store name and version number are optional. If store name is empty, all stores in the cluster are deleted.
- Specified by:
wipeCluster
in interfaceAdmin
- Parameters:
clusterName
- name of the Venice cluster.fabric
- name of the fabric.storeName
- name of the to be deleted store, if value is absent, all stores in the cluster are deleted.versionNum
- the number of the version to be deleted, if present, only the specified version is deleted.
-
compareStore
public StoreComparisonInfo compareStore(String clusterName, String storeName, String fabricA, String fabricB) Description copied from interface:Admin
Compare store metadata and version states between two fabrics.- Specified by:
compareStore
in interfaceAdmin
- See Also:
-
copyOverStoreSchemasAndConfigs
public StoreInfo copyOverStoreSchemasAndConfigs(String clusterName, String srcFabric, String destFabric, String storeName) - Specified by:
copyOverStoreSchemasAndConfigs
in interfaceAdmin
- See Also:
-
isParent
public boolean isParent()Description copied from interface:Admin
Check whether the controller works as a parent controller -
getParentControllerRegionState
Description copied from interface:Admin
Return the state of the region of the parent controller.- Specified by:
getParentControllerRegionState
in interfaceAdmin
- Returns:
ParentControllerRegionState.ACTIVE
which means that the parent controller in the region is serving requests. Otherwise, returnParentControllerRegionState.PASSIVE
- See Also:
-
getChildDataCenterControllerUrlMap
Description copied from interface:Admin
Get child datacenter to child controller url mapping.- Specified by:
getChildDataCenterControllerUrlMap
in interfaceAdmin
- Returns:
- A map of child datacenter -> child controller url
- See Also:
-
getChildDataCenterControllerD2Map
Description copied from interface:Admin
Get child datacenter to child controller d2 zk host mapping- Specified by:
getChildDataCenterControllerD2Map
in interfaceAdmin
- Returns:
- A map of child datacenter -> child controller d2 zk host
- See Also:
-
getChildControllerD2ServiceName
Description copied from interface:Admin
Get child datacenter controller d2 service name- Specified by:
getChildControllerD2ServiceName
in interfaceAdmin
- Returns:
- d2 service name
- See Also:
-
getMetaStoreWriter
Description copied from interface:Admin
Return aMetaStoreWriter
, which can be shared across different Venice clusters.- Specified by:
getMetaStoreWriter
in interfaceAdmin
- See Also:
-
getMetaStoreReader
- Specified by:
getMetaStoreReader
in interfaceAdmin
-
getEmergencySourceRegion
Description copied from interface:Admin
Return the emergency source region configuration.- Specified by:
getEmergencySourceRegion
in interfaceAdmin
- See Also:
-
getAggregateRealTimeTopicSource
Description copied from interface:Admin
Return the source Kafka boostrap server url for aggregate real-time topic updates- Specified by:
getAggregateRealTimeTopicSource
in interfaceAdmin
- See Also:
-
isActiveActiveReplicationEnabledInAllRegion
public boolean isActiveActiveReplicationEnabledInAllRegion(String clusterName, String storeName, boolean checkCurrentVersion) Description copied from interface:Admin
Returns true if A/A replication is enabled in all child controller and parent controller. This is implemented only in parent controller. Otherwise, return false.- Specified by:
isActiveActiveReplicationEnabledInAllRegion
in interfaceAdmin
- See Also:
-
getClustersLeaderOf
Description copied from interface:Admin
Get a list of clusters this controller is a leader of.- Specified by:
getClustersLeaderOf
in interfaceAdmin
- Returns:
- a list of clusters this controller is a leader of.
- See Also:
-
getBackupVersionDefaultRetentionMs
public long getBackupVersionDefaultRetentionMs()Description copied from interface:Admin
Returns default backup version retention time.- Specified by:
getBackupVersionDefaultRetentionMs
in interfaceAdmin
- See Also:
-
getDefaultMaxRecordSizeBytes
public int getDefaultMaxRecordSizeBytes()- Specified by:
getDefaultMaxRecordSizeBytes
in interfaceAdmin
- Returns:
- The default value of
VeniceWriter.maxRecordSizeBytes
which is provided to the VPJ and Consumer as a controller config to dynamically control the setting per cluster. - See Also:
-
nodeReplicaReadiness
public Pair<NodeReplicasReadinessState,List<Replica>> nodeReplicaReadiness(String cluster, String helixNodeId) Description copied from interface:Admin
helixNodeId nodeId of helix participant. HOST_PORT. Returns ture, if all current version replicas of the input node are ready to serve. false and all unready replicas otherwise.- Specified by:
nodeReplicaReadiness
in interfaceAdmin
- See Also:
-
initiateDataRecovery
public void initiateDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, boolean copyAllVersionConfigs, Optional<Version> sourceFabricVersion) Description copied from interface:Admin
Initiate data recovery for a store version given a source fabric.- Specified by:
initiateDataRecovery
in interfaceAdmin
- Parameters:
clusterName
- of the store.storeName
- of the store.version
- of the store.sourceFabric
- to be used as the source for data recovery.copyAllVersionConfigs
- a boolean to indicate whether all version configs should be copied from the source fabric or only the essential version configs and generate the rest based on destination fabric's Store configs.sourceFabricVersion
- source fabric's Version configs used to configure the recovering version in the destination fabric.- See Also:
-
prepareDataRecovery
public void prepareDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Description copied from interface:Admin
Prepare for data recovery in the destination fabric. The interested store version might have lingering states and resources in the destination fabric from previous failed attempts. Perform some basic checks to make sure the store version in the destination fabric is capable of performing data recovery and cleanup any lingering states and resources.- Specified by:
prepareDataRecovery
in interfaceAdmin
- See Also:
-
isStoreVersionReadyForDataRecovery
public Pair<Boolean,String> isStoreVersionReadyForDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Description copied from interface:Admin
Check if the store version's previous states and resources are cleaned up and ready to start data recovery.- Specified by:
isStoreVersionReadyForDataRecovery
in interfaceAdmin
- Returns:
- whether is ready to start data recovery and the reason if it's not ready.
- See Also:
-
isAdminTopicConsumptionEnabled
Description copied from interface:Admin
Return whether the admin consumption task is enabled for the passed cluster.- Specified by:
isAdminTopicConsumptionEnabled
in interfaceAdmin
- See Also:
-
getLargestUsedVersionFromStoreGraveyard
- Specified by:
getLargestUsedVersionFromStoreGraveyard
in interfaceAdmin
- Returns:
- the largest used version number for the given store from store graveyard.
-
createStoragePersona
public void createStoragePersona(String clusterName, String name, long quotaNumber, Set<String> storesToEnforce, Set<String> owners) - Specified by:
createStoragePersona
in interfaceAdmin
- See Also:
-
getStoragePersona
- Specified by:
getStoragePersona
in interfaceAdmin
- See Also:
-
deleteStoragePersona
- Specified by:
deleteStoragePersona
in interfaceAdmin
- See Also:
-
updateStoragePersona
public void updateStoragePersona(String clusterName, String name, UpdateStoragePersonaQueryParams queryParams) - Specified by:
updateStoragePersona
in interfaceAdmin
- See Also:
-
getPersonaAssociatedWithStore
- Specified by:
getPersonaAssociatedWithStore
in interfaceAdmin
- See Also:
-
getClusterStoragePersonas
- Specified by:
getClusterStoragePersonas
in interfaceAdmin
-
cleanupInstanceCustomizedStates
Description copied from interface:Admin
Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant. This operation shouldn't be needed under normal circumstances. It's intended to cleanup ZNodes that failed to be deleted due to bugs and errors.- Specified by:
cleanupInstanceCustomizedStates
in interfaceAdmin
- Parameters:
clusterName
- to perform the cleanup.- Returns:
- list of deleted ZNode paths.
-
getStoreGraveyard
- Specified by:
getStoreGraveyard
in interfaceAdmin
-
removeStoreFromGraveyard
- Specified by:
removeStoreFromGraveyard
in interfaceAdmin
-
getPushStatusStoreReader
- Specified by:
getPushStatusStoreReader
in interfaceAdmin
-
getPushStatusStoreWriter
- Specified by:
getPushStatusStoreWriter
in interfaceAdmin
-
sendHeartbeatToSystemStore
public void sendHeartbeatToSystemStore(String clusterName, String storeName, long heartbeatTimeStamp) Description copied from interface:Admin
Send a heartbeat timestamp to targeted system store.- Specified by:
sendHeartbeatToSystemStore
in interfaceAdmin
-
getHeartbeatFromSystemStore
Description copied from interface:Admin
Read the latest heartbeat timestamp from system store. If it failed to read from system store, this method should return -1.- Specified by:
getHeartbeatFromSystemStore
in interfaceAdmin
-
getSslFactory
-
isClusterWipeAllowed
-
setPushJobDetailsStoreClient
public void setPushJobDetailsStoreClient(AvroSpecificStoreClient<PushJobStatusRecordKey, PushJobDetails> client) -
getPubSubTopicRepository
- Specified by:
getPubSubTopicRepository
in interfaceAdmin
-