Class VeniceHelixAdmin
- All Implemented Interfaces:
Admin,StoreCleaner,Closeable,AutoCloseable
After using controller as service mode. There are two levels of cluster and controllers. Each venice controller will hold a level1 helix controller which will keep connecting to Helix, there is a cluster only used for all of these level1 controllers(controller's cluster). The second level is our venice clusters. Like prod cluster, dev cluster etc. Each of cluster will be Helix resource in the controller's cluster. Helix will choose one of level1 controller becoming the leader of our venice cluster. In our distributed controllers state transition handler, a level2 controller will be initialized to manage this venice cluster only. If this level1 controller is chosen as the leader controller of multiple Venice clusters, multiple level2 controller will be created based on cluster specific config.
Admin is shared by multiple cluster's controllers running in one physical Venice controller instance.
-
Nested Class Summary
Nested ClassesNested classes/interfaces inherited from interface com.linkedin.venice.controller.Admin
Admin.OfflinePushStatusInfo -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected static final intprotected static final longprotected final PubSubTopicRepository -
Constructor Summary
ConstructorsConstructorDescriptionVeniceHelixAdmin(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sslEnabled, com.linkedin.d2.balancer.D2Client d2Client, Map<String, com.linkedin.d2.balancer.D2Client> d2Clients, Optional<SSLConfig> sslConfig, Optional<DynamicAccessController> accessController, Optional<ICProvider> icProvider, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory, PubSubPositionTypeRegistry pubSubPositionTypeRegistry, List<ClusterLeaderInitializationRoutine> additionalInitRoutines, Optional<VeniceVersionLifecycleEventListener> versionLifecycleEventListener) VeniceHelixAdmin(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, com.linkedin.d2.balancer.D2Client d2Client, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory, PubSubPositionTypeRegistry pubSubPositionTypeRegistry, Optional<VeniceVersionLifecycleEventListener> versionLifecycleEventListener) -
Method Summary
Modifier and TypeMethodDescriptionvoidabortMigration(String srcClusterName, String destClusterName, String storeName) Abort store migration by resetting migration flag at the source cluster, resetting storeConfig, and updating "cluster" in "/storeConfigs" znode back to the source cluster.addDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties.addDerivedSchema(String clusterName, String storeName, int valueSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties and return a newDerivedSchemaEntryobject containing the schema and its id.voidaddInstanceToAllowlist(String clusterName, String helixNodeId) Add the given helix nodeId into the allowlist in ZK.addReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaId, int replicationMetadataVersionId, String replicationMetadataSchemaStr) Create a newReplicationMetadataSchemaEntryobject with the given properties and add it into schema repository if no duplication.booleanaddSpecificVersion(String clusterName, String storeName, Version version) TODO refactor addVersion to these broken down methods instead of doing everything in one giant method.addSupersetSchema(String clusterName, String storeName, String valueSchema, int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) Add a new superset schema for the given store with all specified properties.addValueSchema(String clusterName, String storeName, String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType compatibilityType) Add a new value schema for the given store with all specified properties and return a new SchemaEntry object containing the schema and its id.addValueSchema(String clusterName, String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) voidaddVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion) This function is only being used by store migration parent controllers, which write add version admin message.voidaddVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion, int currentRTVersionNumber) This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked from the admin channel.addVersionAndTopicOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, String compressionDictionary, String remoteKafkaBootstrapServers, Optional<String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion, int largestUsedRTVersionNumber) A wrapper to invoke VeniceHelixAdmin#addVersion to only increment the version and create the topic(s) needed without starting ingestion.addVersionOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, int largestUsedRTVersionNumber) Only add version to the store without creating the topic or start ingestion.voidautoMigrateStore(String srcClusterName, String destClusterName, String storeName, Optional<Integer> currStep, Optional<Boolean> abortOnFailure) intcalculateNumberOfPartitions(String clusterName, String storeName) Calculate number of partition for given store.protected voidcheckPreConditionForCreateStore(String clusterName, String storeName, String keySchema, String valueSchema, boolean allowSystemStore, boolean skipLingeringResourceCheck) Check whether Controller should block the incoming store creation.voidcheckResourceCleanupBeforeStoreCreation(String clusterName, String storeName) Check whether there are any resource left for the store creation in cluster: If there is any, this function should throw Exception.cleanupInstanceCustomizedStates(String clusterName) Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant.voidclearIngestionKillMessageAndVerify(String clusterName, String versionTopicName) Clear KILL messages from a participant system store.voidclearInstanceMonitor(String clusterName) voidclose()Cause VeniceHelixAdmin and its associated services to stop executing.compareStore(String clusterName, String storeName, String fabricA, String fabricB) Compare store metadata and version states between two fabrics.voidcompleteMigration(String srcClusterName, String destClusterName, String storeName) voidconfigureActiveActiveReplication(String clusterName, VeniceUserStoreType storeType, Optional<String> storeName, boolean enableActiveActiveReplicationForCluster, Optional<String> regionsFilter) Enable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster.booleancontainsHelixResource(String clusterName, String kafkaTopic) This purpose of the function is to check if the given resource exists in the Helix cluster.copyOverStoreSchemasAndConfigs(String clusterName, String srcFabric, String destFabric, String storeName) voidcreateHelixResourceAndStartMonitoring(String clusterName, String storeName, Version version) Create Helix-resources for a given storage node cluster and starts monitoring a new push.voidcreateSpecificVersionTopic(String clusterName, String storeName, Version version) Create the corresponding version topic based on the providedVersionvoidcreateStoragePersona(String clusterName, String name, long quotaNumber, Set<String> storesToEnforce, Set<String> owners) voidcreateStore(String clusterName, String storeName, String owner, String keySchema, String valueSchema, boolean isSystemStore, Optional<String> accessPermissions) Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.voiddeleteAclForStore(String clusterName, String storeName) Delete the current set of ACL provisioned for a venice store and its associated kafka topic.deleteAllVersionsInStore(String clusterName, String storeName) Delete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).voiddeleteHelixResource(String clusterName, String kafkaTopic) This purpose of the function is to delete the given resource from the Helix cluster.voiddeleteOldVersionInStore(String clusterName, String storeName, int versionNum) Delete the given version from the store.voiddeleteOneStoreVersion(String clusterName, String storeName, int versionNumber) Delete version from cluster, removing all related resourcesvoiddeleteParticipantStoreKillMessage(String clusterName, String kafkaTopic) Compose aParticipantMessageKeymessage and execute a delete operation on the key to the cluster's participant store.voiddeleteStoragePersona(String clusterName, String name) voiddeleteStore(String clusterName, String storeName, boolean isAbortMigrationCleanup, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) This method will delete store data, metadata, version and rt topics One exception is for stores with isMigrating flag set.voiddeleteStore(String clusterName, String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) voiddeleteValueSchemas(String clusterName, String storeName, Set<Integer> unusedValueSchemaIds) Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIdsdiscoverCluster(String storeName) Find the cluster which the given store belongs to.voidenableDisabledPartition(String clusterName, String kafkaTopic, boolean enableAll) findAllBootstrappingVersions(String clusterName) Find the store versions which have at least one bootstrap replica.getAclForStore(String clusterName, String storeName) Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.getAdminCommandExecutionTracker(String clusterName) Get the tracker used to track the execution of the admin command for the given cluster.getAdminConsumerService(String clusterName) getAdminOperationVersionFromControllers(String clusterName) Get the admin operation protocol versions from all controllers for specific cluster.getAdminTopicMetadata(String clusterName, Optional<String> storeName) getAggregatedHealthStatus(String cluster, List<String> instances, List<String> toBeStoppedInstances, boolean isSSLEnabled) getAggregateRealTimeTopicSource(String clusterName) Return the source Kafka boostrap server url for aggregate real-time topic updatesGet all live instance controllers from ZK /LIVEINSTANCESgetAllowlist(String clusterName) Unsupported operation in the child controller.getAllStores(String clusterName) getAllStoreStatuses(String clusterName) Get the statuses of all stores.intgetBackupVersion(String clusterName, String storeName) longReturns default backup version retention time.intgetBackupVersionNumber(List<Version> versions, int currentVersion) Get backup version number, the largest online version number that is less than the current version numbergetBackupVersionsForMultiColos(String clusterName, String storeName) getBatchJobHeartbeatValue(BatchJobHeartbeatKey batchJobHeartbeatKey) getChildControllerD2ServiceName(String clusterName) Get child datacenter controller d2 service namegetChildDataCenterControllerD2Map(String clusterName) Get child datacenter to child controller d2 zk host mappinggetChildDataCenterControllerUrlMap(String clusterName) Get child datacenter to child controller url mapping.Get a list of clusters this controller is a leader of.getClusterStaleStores(String clusterName) getClusterStoragePersonas(String clusterName) getClusterStores(String clusterName) Return all stores in a cluster.getControllerClientMap(String clusterName) getControllerConfig(String clusterName) getControllersByHelixState(String clusterName, String helixState) Get controllers instance based on the given helix state.intgetCurrentVersion(String clusterName, String storeName) getCurrentVersionsForMultiColos(String clusterName, String storeName) intgetDerivedSchemaId(String clusterName, String storeName, String schemaStr) getDerivedSchemas(String clusterName, String storeName) getDisabledPartitionStats(String clusterName) getEmergencySourceRegion(String clusterName) Return the emergency source region configuration.intgetFutureVersion(String clusterName, String storeName) getFutureVersionsForMultiColos(String clusterName, String storeName) intgetFutureVersionWithStatus(String clusterName, String storeName, VersionStatus status) longgetHeartbeatFromSystemStore(String clusterName, String systemStoreName) Read the latest heartbeat timestamp from system store.protected org.apache.helix.HelixAdmingetHelixVeniceClusterResources(String cluster) org.apache.helix.model.IdealStategetIdealState(String clusterName, String resourceName) getIncrementalPushVersion(String clusterName, String storeName, String pushJobId) getInUseValueSchemaIds(String clusterName, String storeName) getKafkaBootstrapServers(boolean isSSL) Return the ssl or non-ssl bootstrap servers based on the given flag.getKeySchema(String clusterName, String storeName) intgetLargestUsedVersion(String clusterName, String storeName) intgetLargestUsedVersionFromStoreGraveyard(String clusterName, String storeName) Deprecated but remain here to keep compatibility untilAdmin.getLargestUsedVersion(String, String)is used.getLastSucceededExecutionId(String clusterName, String storeName) Get last succeeded execution id for a given store; if storeName is null, return the last succeeded execution id for the clustergetLastSucceedExecutionId(String clusterName) Get the id of the last succeed execution in this controller.getLeaderController(String clusterName) Get instance of leader controller.getLiveInstanceMonitor(String clusterName) longGet the local admin operation protocol version.getMetaStoreValue(StoreMetaKey metaKey, String storeName) Return aMetaStoreWriter, which can be shared across different Venice clusters.intgetNativeReplicationKafkaBootstrapServerAddress(String sourceFabric) getNativeReplicationSourceFabric(String clusterName, Store store, Optional<String> sourceGridFabric, Optional<String> emergencySourceRegion, String targetedRegions) Source fabric selection priority: 1.getOffLinePushStatus(String clusterName, String kafkaTopic) Query the status of the offline push by given kafka topic.getOffLinePushStatus(String clusterName, String kafkaTopic, Optional<String> incrementalPushVersion, String region, String targetedRegions) getOffLinePushStatus(String clusterName, String kafkaTopic, Optional<String> incrementalPushVersion, String region, String targetedRegions, boolean isTargetRegionPushWithDeferredSwap) protected static ExecutionStatusgetOverallPushStatus(ExecutionStatus veniceStatus, ExecutionStatus daVinciStatus) Return the state of the region of the parent controller.getPersonaAssociatedWithStore(String clusterName, String storeName) getPubSubSSLProperties(String pubSubBrokerAddress) Return a shared read only schema repository for zk shared stores.Return a shared read only store repository for zk shared stores.getReferenceVersionForStreamingWrites(String clusterName, String storeName, String pushJobId) Return the region name of this AdmingetRegionPushDetails(String clusterName, String storeName, boolean isPartitionDetailAdded) getReplicas(String clusterName, String kafkaTopic) getReplicasOfStorageNode(String cluster, String instanceId) intgetReplicationFactor(String clusterName, String storeName) Optional<org.apache.avro.Schema>getReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaID, int rmdVersionID) getReplicationMetadataSchemas(String clusterName, String storeName) getRepushInfo(String clusterName, String storeName, Optional<String> fabricName) getRouterD2Service(String clusterName) Find the router d2 service associated with a given cluster name.getRoutersClusterConfig(String clusterName) Get the cluster level config for all routers.getServerD2Service(String clusterName) Find the server d2 service associated with a given cluster name.getStartedVersion(Store store) The intended semantic is to use this method to find the version that something is currently pushing to.doublegetStorageEngineOverheadRatio(String clusterName) getStorageNodes(String clusterName) getStorageNodesStatus(String clusterName, boolean enableReplica) getStorageNodesStatus(String clusterName, String instanceId) Query and return the current status of the given storage node.getStoragePersona(String clusterName, String name) Return a shared store config repository.getStoresForCompaction(String clusterName) - intermediary betweenLogCompactionServiceandCompactionManager- injects the child controller'sControllerClientinto the functionCompactionManager.getStoresForCompaction(String, Map)- serves as API endpoint to query stores ready for log compactiongetTopicManager(String pubSubServerAddress) getValueSchema(String clusterName, String storeName, int id) intgetValueSchemaId(String clusterName, String storeName, String valueSchemaStr) getValueSchemas(String clusterName, String storeName) org.apache.helix.zookeeper.impl.client.ZkClientbooleanTest if the input store exists in a cluster.incrementVersionIdempotent(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) Note: this currently use the pushID to guarantee idempotence, unexpected behavior may result if multiple batch jobs push to the same store at the same time.voidinitiateDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, boolean copyAllVersionConfigs, Optional<Version> sourceFabricVersion) Initiate data recovery for a store version given a source fabric.voidinitStorageCluster(String clusterName) Create and configure the Venice storage cluster with required properties in Helix and waits the resource's (partial) partition to appear in the external view.booleanisActiveActiveReplicationEnabledInAllRegion(String clusterName, String storeName, boolean checkCurrentVersion) Returns true if A/A replication is enabled in all child controller and parent controller.booleanisAdminTopicConsumptionEnabled(String clusterName) Return whether the admin consumption task is enabled for the passed cluster.booleanisClusterValid(String clusterName) Test if a cluster is valid (in Helix cluster list).booleanisClusterWipeAllowed(String clusterName) isInstanceRemovable(String clusterName, String helixNodeId, List<String> lockedNodes) Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster.booleanisLeaderControllerFor(String clusterName) Check if this controller itself is the leader controller for a given cluster or not.booleanThis function is used to detect whether current node is the leader controller of controller cluster.booleanisParent()Check whether the controller works as a parent controllerbooleanisResourceStillAlive(String resourceName) Test if a given helix resource is still alive (existent in ZK).booleanisRTTopicDeletionPermittedByAllControllers(String clusterName, String rtTopicName) booleanisSSLEnabledForPush(String clusterName, String storeName) Return whether ssl is enabled for the given store for push.booleanTest if ssl is enabled to Kafka.booleanisStorageNodeNewerOrEqualTo(String clusterName, String instanceId, StorageNodeStatus oldStatus) Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one.booleanisStoreMigrationAllowed(String clusterName) Test if the store migration is allowed for a cluster.isStoreVersionReadyForDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Check if the store version's previous states and resources are cleaned up and ready to start data recovery.booleanisTopicTruncated(String kafkaTopicName) Check if a kafka topic is absent or truncated.booleanisTopicTruncatedBasedOnRetention(long retention) Test if retention is less than the configured DEPRECATED_TOPIC_MAX_RETENTION_MS value.booleanisTopicTruncatedBasedOnRetention(String kafkaTopicName, long retentionTime) Topic should also be considered to get cleaned up if: retention is less than the configured ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS value.voidkillOfflinePush(String clusterName, String kafkaTopic, boolean isForcedKill) Kill an offline push if it ran into errors or the corresponding version is being retired.listStorePushInfo(String clusterName, String storeName, boolean isPartitionDetailEnabled) protected static HybridStoreConfigmergeNewSettingsIntoOldHybridStoreConfig(Store oldStore, Optional<Long> hybridRewindSeconds, Optional<Long> hybridOffsetLagThreshold, Optional<Long> hybridTimeLagThreshold, Optional<DataReplicationPolicy> hybridDataReplicationPolicy, Optional<BufferReplayPolicy> bufferReplayPolicy, Optional<String> realTimeTopicName) Used by both theVeniceHelixAdminand theVeniceParentHelixAdminvoidmigrateStore(String srcClusterName, String destClusterName, String storeName) Main implementation for migrating a store from its source cluster to a new destination cluster.nodeReplicaReadiness(String cluster, String helixNodeId) helixNodeId nodeId of helix participant.voidpreFetchDeadStoreStats(String clusterName, List<StoreInfo> storeInfos) voidprepareDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Prepare for data recovery in the destination fabric.removeDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId) Remove an existing derived schemavoidremoveInstanceFromAllowList(String clusterName, String helixNodeId) Remove the given helix nodeId from the allowlist in ZK.voidremoveStorageNode(String clusterName, String instanceId) Remove one storage node from the given cluster.voidremoveStoreFromGraveyard(String clusterName, String storeName) voidreplicateAddVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId) This method is invoked in parent controllers to replicate new version signals for migrating store.voidreplicateUpdateStore(String clusterName, String storeName, UpdateStoreQueryParams params) This method is invoked in parent controllers for store migration.repushStore(RepushJobRequest repushJobRequest) triggers repush for storeName for log compaction of store topicvoidretireOldStoreVersions(String clusterName, String storeName, boolean deleteBackupOnStartPush, int currentVersionBeforePush) For a given store, determine its versions to delete based on the BackupStrategy settings and execute the deletion in the cluster (including all its resources).retrievePushStatus(String clusterName, StoreInfo store) voidrollbackToBackupVersion(String clusterName, String storeName, String regionFilter) Set backup version as current version in a child region.voidrollForwardToFutureVersion(String clusterName, String storeName, String regionFilter) voidsendHeartbeatToSystemStore(String clusterName, String storeName, long heartbeatTimeStamp) Send a heartbeat timestamp to targeted system store.voidsendKillMessageToParticipantStore(String clusterName, String kafkaTopic) voidsendPushJobDetails(PushJobStatusRecordKey key, PushJobDetails value) Lazy initialize a Venice writer for an internal real time topic store of push job details records.voidsetAdminConsumerService(String clusterName, AdminConsumerService service) voidvoidsetStoreConfigForMigration(String storeName, String srcClusterName, String destClusterName) Update "migrationDestCluster" and "migrationSrcCluster" fields of the "/storeConfigs/storeName" znode.voidsetStoreCurrentVersion(String clusterName, String storeName, int versionNumber) Update the current version of a specified store.voidsetStoreLargestUsedRTVersion(String clusterName, String storeName, int versionNumber) Update the largest used RT version number of a specified store.voidsetStoreLargestUsedVersion(String clusterName, String storeName, int versionNumber) Update the largest used version number of a specified store.voidsetStoreOwner(String clusterName, String storeName, String owner) Update the owner of a specified store.voidsetStorePartitionCount(String clusterName, String storeName, int partitionCount) Since partition check/calculation only happens when adding new store version,setStorePartitionCount(String, String, int)would only change the number of partition for the following pushes.voidsetStorePushStrategyForMigration(String voldemortStoreName, String strategy) Unsupported operation in the child controller.voidsetStoreReadability(String clusterName, String storeName, boolean desiredReadability) Update the readability of a specified store.voidsetStoreReadWriteability(String clusterName, String storeName, boolean isAccessible) Update both readability and writability of a specified store.voidsetStoreWriteability(String clusterName, String storeName, boolean desiredWriteability) Update the writability of a specified store.voidskipAdminMessage(String clusterName, long offset, boolean skipDIV, long executionId) The admin consumption task tries to deal with failures to process an admin message by retrying.voidstartInstanceMonitor(String clusterName) voidStop the helix controller for a single cluster.voidstopMonitorOfflinePush(String clusterName, String topic, boolean deletePushStatus, boolean isForcedDelete) voidStop the entire controller but not only the helix controller for a single cluster.voidstoreMetadataUpdate(String clusterName, String storeName, VeniceHelixAdmin.StoreMetadataOperation operation) Update the store metadata by applying provided operation.voidtopicCleanupWhenPushComplete(String clusterName, String storeName, int versionNumber) In this function, Controller will setup proper compaction strategy when the push job is full completed, and here are the reasons to set it up after the job completes: 1.booleantruncateKafkaTopic(String kafkaTopicName) We don't actually truncate any Kafka topic here; we just update the retention time.booleantruncateKafkaTopic(String topicName, long retentionTime) Truncate a Kafka topic by setting its retention time to the input value.voidupdateAclForStore(String clusterName, String storeName, String accessPermissions) Provision a new set of ACL for a venice store and its associated kafka topic.voidupdateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) Update AdminOperationProtocolVersion in metadatavoidupdateAdminTopicMetadata(String clusterName, long executionId, Optional<String> storeName, Optional<Long> offset, Optional<Long> upstreamOffset) Update cluster-level execution id, offset and upstream offset.voidupdateClusterConfig(String clusterName, UpdateClusterConfigQueryParams params) Update the LiveClusterConfig at runtime for a specified cluster.voidupdateClusterDiscovery(String storeName, String oldCluster, String newCluster, String initiatingCluster) Update the cluster discovery of a given store by writing to the StoreConfig ZNode.voidupdateDarkClusterConfig(String clusterName, UpdateDarkClusterConfigQueryParams params) booleanupdateIdealState(String clusterName, String resourceName, int minReplica) voidupdateRoutersClusterConfig(String clusterName, Optional<Boolean> isThrottlingEnable, Optional<Boolean> isQuotaRebalancedEnable, Optional<Boolean> isMaxCapacityProtectionEnabled, Optional<Integer> expectedRouterCount) Update the cluster level for all routers.voidupdateStoragePersona(String clusterName, String name, UpdateStoragePersonaQueryParams queryParams) voidupdateStore(String clusterName, String storeName, UpdateStoreQueryParams params) TODO: some logics are in parent controllerVeniceParentHelixAdmin#updateStore and some are in the child controller here.voidvalidateAndMaybeRetrySystemStoreAutoCreation(String clusterName, String storeName, VeniceSystemStoreType systemStoreType) Check the creation results of a user store's system store.validateStoreDeleted(String clusterName, String storeName) Validates that a store has been completely deleted from the Venice cluster.versionsForStore(String clusterName, String storeName) booleanwhetherEnableBatchPushFromAdmin(String clusterName, String storeName) Test if a store is allowed for a batch push.voidwipeCluster(String clusterName, String fabric, Optional<String> storeName, Optional<Integer> versionNum) Delete stores from the cluster including both store data and metadata.voidwriteEndOfPush(String clusterName, String storeName, int versionNumber, boolean alsoWriteStartOfPush) Create a local Venice writer based on store version info and, for each partition, use the writer to send END_OF_PUSH and END_OF_SEGMENT control messages to Kafka.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface com.linkedin.venice.controller.Admin
addValueSchema, createStore, createStore, getDatacenterCount, hasWritePermissionToBatchJobHeartbeatStore, incrementVersionIdempotent, incrementVersionIdempotent
-
Field Details
-
RETRY_FAILURE_TYPES
-
INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS
protected static final int INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS- See Also:
-
INTERNAL_STORE_RTT_RETRY_BACKOFF_MS
protected static final long INTERNAL_STORE_RTT_RETRY_BACKOFF_MS -
pubSubTopicRepository
-
-
Constructor Details
-
VeniceHelixAdmin
public VeniceHelixAdmin(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, com.linkedin.d2.balancer.D2Client d2Client, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory, PubSubPositionTypeRegistry pubSubPositionTypeRegistry, Optional<VeniceVersionLifecycleEventListener> versionLifecycleEventListener) -
VeniceHelixAdmin
public VeniceHelixAdmin(VeniceControllerMultiClusterConfig multiClusterConfigs, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sslEnabled, com.linkedin.d2.balancer.D2Client d2Client, Map<String, com.linkedin.d2.balancer.D2Client> d2Clients, Optional<SSLConfig> sslConfig, Optional<DynamicAccessController> accessController, Optional<ICProvider> icProvider, PubSubTopicRepository pubSubTopicRepository, PubSubClientsFactory pubSubClientsFactory, PubSubPositionTypeRegistry pubSubPositionTypeRegistry, List<ClusterLeaderInitializationRoutine> additionalInitRoutines, Optional<VeniceVersionLifecycleEventListener> versionLifecycleEventListener)
-
-
Method Details
-
startInstanceMonitor
- Specified by:
startInstanceMonitorin interfaceAdmin
-
getLiveInstanceMonitor
-
clearInstanceMonitor
- Specified by:
clearInstanceMonitorin interfaceAdmin
-
getZkClient
public org.apache.helix.zookeeper.impl.client.ZkClient getZkClient() -
getExecutionIdAccessor
-
getAdapterSerializer
-
initStorageCluster
Create and configure the Venice storage cluster with required properties in Helix and waits the resource's (partial) partition to appear in the external view.- Specified by:
initStorageClusterin interfaceAdmin- Parameters:
clusterName- Venice cluster name.
-
isResourceStillAlive
Test if a given helix resource is still alive (existent in ZK).- Specified by:
isResourceStillAlivein interfaceAdmin- Parameters:
resourceName- Helix resource name.- Returns:
trueif resource exists;falseotherwise.
-
isClusterValid
Test if a cluster is valid (in Helix cluster list).- Specified by:
isClusterValidin interfaceAdmin- Parameters:
clusterName- Venice cluster name.- Returns:
trueif input cluster is in Helix cluster list;falseotherwise.
-
getHelixAdmin
protected org.apache.helix.HelixAdmin getHelixAdmin() -
createStore
public void createStore(String clusterName, String storeName, String owner, String keySchema, String valueSchema, boolean isSystemStore, Optional<String> accessPermissions) Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.- Specified by:
createStorein interfaceAdmin- Parameters:
clusterName- Venice cluster where the store locates.storeName- name of the store.owner- owner of the store.keySchema- key schema of the store.valueSchema- value schema of the store.isSystemStore- if the store is a system store.accessPermissions- json string representing the access-permissions.
-
deleteStore
public void deleteStore(String clusterName, String storeName, boolean isAbortMigrationCleanup, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion) This method will delete store data, metadata, version and rt topics One exception is for stores with isMigrating flag set. In that case, the corresponding kafka topics and storeConfig will not be deleted so that they are still available for the cloned store.- Specified by:
deleteStorein interfaceAdmin
-
deleteStore
-
sendPushJobDetails
Lazy initialize a Venice writer for an internal real time topic store of push job details records. Use this writer to put a pair of push job detail record (keyandvalue).- Specified by:
sendPushJobDetailsin interfaceAdmin- Parameters:
key- key with which the specified value is to be associated.value- value to be associated with the specified key.
-
getPushJobDetails
- Specified by:
getPushJobDetailsin interfaceAdmin- Returns:
- the value to which the specified key is mapped from the Venice internal real time topic store.
-
getBatchJobHeartbeatValue
public BatchJobHeartbeatValue getBatchJobHeartbeatValue(@Nonnull BatchJobHeartbeatKey batchJobHeartbeatKey) - Specified by:
getBatchJobHeartbeatValuein interfaceAdmin- Returns:
- the value to which the specified key is mapped from the Venice internal
BATCH_JOB_HEARTBEAT_STOREtopic store.
-
writeEndOfPush
public void writeEndOfPush(String clusterName, String storeName, int versionNumber, boolean alsoWriteStartOfPush) Create a local Venice writer based on store version info and, for each partition, use the writer to send END_OF_PUSH and END_OF_SEGMENT control messages to Kafka.- Specified by:
writeEndOfPushin interfaceAdmin- Parameters:
clusterName- name of the Venice cluster.storeName- name of the store.versionNumber- store version number.alsoWriteStartOfPush- if Venice writer sends a START_OF_PUSH control message first.
-
whetherEnableBatchPushFromAdmin
Test if a store is allowed for a batch push.- Specified by:
whetherEnableBatchPushFromAdminin interfaceAdmin- Parameters:
storeName- name of a store.- Returns:
trueis the store is a participant system store or if Venice is running in single-region mode
-
isStoreMigrationAllowed
Test if the store migration is allowed for a cluster. It reads the value "allow.store.migration" from the"/clusterName/ClusterConfig"znode.- Specified by:
isStoreMigrationAllowedin interfaceAdmin- Parameters:
clusterName- name of Venice cluster.- Returns:
trueif store migration is allowed for the input cluster;falseotherwise.
-
migrateStore
Main implementation for migrating a store from its source cluster to a new destination cluster. A new store (with same properties, e.g. name, owner, key schema, value schema) is created at the destination cluster and its StoreInfo is also cloned. For a store with enabled meta system store or enabled davinci push status, those system stores are also migrated. Different store versions are evaluated for the migration. For those versions to be migrated, it triggers the ADD_VERSION and starts ingestion at the destination cluster.- Specified by:
migrateStorein interfaceAdmin- Parameters:
srcClusterName- name of the source cluster.destClusterName- name of the destination cluster.storeName- name of the target store.
-
clearIngestionKillMessageAndVerify
Clear KILL messages from a participant system store. -
getControllerClientMap
-
completeMigration
- Specified by:
completeMigrationin interfaceAdmin- See Also:
-
abortMigration
Abort store migration by resetting migration flag at the source cluster, resetting storeConfig, and updating "cluster" in "/storeConfigs" znode back to the source cluster.- Specified by:
abortMigrationin interfaceAdmin- Parameters:
srcClusterName- name of the source cluster.destClusterName- name of the destination cluster.storeName- name of the store in migration.
-
updateClusterDiscovery
public void updateClusterDiscovery(String storeName, String oldCluster, String newCluster, String initiatingCluster) Description copied from interface:AdminUpdate the cluster discovery of a given store by writing to the StoreConfig ZNode.- Specified by:
updateClusterDiscoveryin interfaceAdmin- Parameters:
storeName- of the store.oldCluster- for the store.newCluster- for the store.initiatingCluster- that is making the update. This is needed because in the case of store migration sometimes the update is not made by the leader of the current cluster but instead the leader of the source cluster.- See Also:
-
validateStoreDeleted
Validates that a store has been completely deleted from the Venice cluster. This method performs comprehensive checks across multiple subsystems to ensure no lingering resources remain that would prevent safe store recreation. Resources checked: 1. Store configuration in ZooKeeper 2. Store metadata in store repository 3. System stores (only those that were enabled for the original store) 4. Kafka topics (version, RT, and system store topics) 5. Helix resources- Specified by:
validateStoreDeletedin interfaceAdmin- Parameters:
clusterName- the name of the cluster to check (must not be null or empty)storeName- the name of the store to validate deletion for (must not be null or empty)- Returns:
- StoreDeletedResult indicating whether the store is fully deleted or what resources remain
- Throws:
IllegalArgumentException- if clusterName or storeName is null or empty
-
checkPreConditionForCreateStore
protected void checkPreConditionForCreateStore(String clusterName, String storeName, String keySchema, String valueSchema, boolean allowSystemStore, boolean skipLingeringResourceCheck) Check whether Controller should block the incoming store creation. Inside this function, there is a logic to check whether there are any lingering resources since the requested store could be just deleted recently. This check should be skipped in Child Controller, but only enabled in Parent Controller because of the following reasons: 1. Parent Controller has the strict order that the system store must be created before the host Venice store. 2. Child Controller doesn't have this strict order since the admin messages of Child Controller could be executed in parallel since they are different store names. So when such kind of race condition happens, it will cause a dead loop: a. The version creation of system store will create a RT topic in Parent Cluster. b. The RT topic will be mirrored by KMM to the Child Cluster. c. The version creation admin message of system store will be blocked in Child Controller since the host Venice store doesn't exist. d. The store creation admin message of the host Venice store will be blocked in Child Controller because of lingering resource check (RT topic of its system store already exists, which is created by KMM). TODO: Evaluate if this code needs to change now that KMM has been deprecated. In the future, once Venice gets rid of KMM, the topic won't be automatically created by KMM, and this race condition will be addressed. So far, Child Controller will skip lingering resource check when handling store creation admin message. -
addVersionAndStartIngestion
public void addVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion) This function is only being used by store migration parent controllers, which write add version admin message. We useVersion.DEFAULT_RT_VERSION_NUMBERhere that does not change anything in the version and the migrated version copies whatever is there in the source version.- Specified by:
addVersionAndStartIngestionin interfaceAdmin
-
addVersionAndStartIngestion
public void addVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion, int currentRTVersionNumber) This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add version message. -
replicateAddVersionAndStartIngestion
public void replicateAddVersionAndStartIngestion(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId) This method is invoked in parent controllers to replicate new version signals for migrating store. -
addVersionAndTopicOnly
public Pair<Boolean,Version> addVersionAndTopicOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, String compressionDictionary, String remoteKafkaBootstrapServers, Optional<String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion, int largestUsedRTVersionNumber) A wrapper to invoke VeniceHelixAdmin#addVersion to only increment the version and create the topic(s) needed without starting ingestion. -
addVersionOnly
public Version addVersionOnly(String clusterName, String storeName, String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, int largestUsedRTVersionNumber) Only add version to the store without creating the topic or start ingestion. Used to sync version metadata in the parent fabric during store migration. -
addSpecificVersion
TODO refactor addVersion to these broken down methods instead of doing everything in one giant method. Perform add version to a given store with the providedVersion -
createSpecificVersionTopic
Create the corresponding version topic based on the providedVersion -
createHelixResourceAndStartMonitoring
public void createHelixResourceAndStartMonitoring(String clusterName, String storeName, Version version) Create Helix-resources for a given storage node cluster and starts monitoring a new push. -
incrementVersionIdempotent
public Version incrementVersionIdempotent(String clusterName, String storeName, String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, String compressionDictionary, Optional<String> sourceGridFabric, Optional<X509Certificate> requesterCert, long rewindTimeInSecondsOverride, Optional<String> emergencySourceRegion, boolean versionSwapDeferred, String targetedRegions, int repushSourceVersion) Note: this currently use the pushID to guarantee idempotence, unexpected behavior may result if multiple batch jobs push to the same store at the same time.- Specified by:
incrementVersionIdempotentin interfaceAdmin
-
getStartedVersion
The intended semantic is to use this method to find the version that something is currently pushing to. It looks at all versions greater than the current version and identifies the version with a status of STARTED. If there is no STARTED version, it creates a new one for the push to use. This means we cannot use this method to support multiple concurrent pushes.- Parameters:
store-- Returns:
- the started version if there is only one, throws an exception if there is an error version with a greater number than the current version. Otherwise returns Optional.empty()
-
getReplicationMetadataSchema
public Optional<org.apache.avro.Schema> getReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaID, int rmdVersionID) - Specified by:
getReplicationMetadataSchemain interfaceAdmin- Returns:
- replication metadata schema for a store in a cluster with specified schema ID and RMD protocol version ID.
-
getReferenceVersionForStreamingWrites
public Version getReferenceVersionForStreamingWrites(String clusterName, String storeName, String pushJobId) - Specified by:
getReferenceVersionForStreamingWritesin interfaceAdmin
-
getIncrementalPushVersion
- Specified by:
getIncrementalPushVersionin interfaceAdmin
-
getCurrentVersion
- Specified by:
getCurrentVersionin interfaceAdmin- Returns:
- The current version number of an input store in the specified Venice cluster or Store.NON_EXISTING_VERSION if none exists.
-
getFutureVersion
- Specified by:
getFutureVersionin interfaceAdmin- Returns:
- Returns the online (completed, but not yet swapped) or future version with ongoing ingestion else if none exists returns Store.NON_EXISTING_VERSION
-
getBackupVersion
- Specified by:
getBackupVersionin interfaceAdmin
-
getFutureVersionWithStatus
-
getCurrentVersionsForMultiColos
- Specified by:
getCurrentVersionsForMultiColosin interfaceAdmin
-
getRepushInfo
- Specified by:
getRepushInfoin interfaceAdmin- Returns:
- a new RepushInfo object with specified store info.
-
getFutureVersionsForMultiColos
- Specified by:
getFutureVersionsForMultiColosin interfaceAdmin- See Also:
-
getBackupVersionsForMultiColos
- Specified by:
getBackupVersionsForMultiColosin interfaceAdmin
-
deleteAllVersionsInStore
Description copied from interface:AdminDelete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).- Specified by:
deleteAllVersionsInStorein interfaceAdmin- See Also:
-
deleteOldVersionInStore
Description copied from interface:AdminDelete the given version from the store. If the given version is the current version, an exception will be thrown.- Specified by:
deleteOldVersionInStorein interfaceAdmin- See Also:
-
deleteOneStoreVersion
Delete version from cluster, removing all related resources- Specified by:
deleteOneStoreVersionin interfaceStoreCleaner
-
isRTTopicDeletionPermittedByAllControllers
- Specified by:
isRTTopicDeletionPermittedByAllControllersin interfaceAdmin
-
retireOldStoreVersions
public void retireOldStoreVersions(String clusterName, String storeName, boolean deleteBackupOnStartPush, int currentVersionBeforePush) For a given store, determine its versions to delete based on the BackupStrategy settings and execute the deletion in the cluster (including all its resources). It also truncates Kafka topics and Helix resources.- Specified by:
retireOldStoreVersionsin interfaceStoreCleaner- Parameters:
clusterName- name of a cluster.storeName- name of the store to retire.deleteBackupOnStartPush- indicate if it is called in a start-of-push workflow.currentVersionBeforePush- current version before a new push.
-
topicCleanupWhenPushComplete
In this function, Controller will setup proper compaction strategy when the push job is full completed, and here are the reasons to set it up after the job completes: 1. For batch push jobs to batch-only store, there is no impact. There could still be duplicate entries because of speculative executions in map-reduce job, but we are not planning to clean them up now. 2. For batch push jobs to hybrid/incremental stores, if the compaction is enabled at the beginning of the job, Kafka compaction could kick in during push job, and storage node could detect DIV error, such as missing messages, checksum mismatch, because speculative execution could produce duplicate entries, and we don't want to fail the push in this scenario and we still want to perform the strong DIV validation in batch push, so we could only enable compaction after the batch push completes. 3. For GF jobs to hybrid store, it is similar as #2, and it contains duplicate entries because there is no de-dedup happening anywhere. With this way, when load rebalance happens for hybrid/incremental stores, DIV error could be detected during ingestion at any phase since compaction might be enabled long-time ago. So in storage node, we need to add one more safeguard before throwing the DIV exception to check whether the topic is compaction-enabled or not. Since Venice is not going to change the compaction policy between non-compact and compact back and forth, checking whether topic is compaction-enabled or not when encountering DIV error should be good enough.- Specified by:
topicCleanupWhenPushCompletein interfaceStoreCleaner
-
isTopicTruncated
Check if a kafka topic is absent or truncated.- Specified by:
isTopicTruncatedin interfaceAdmin- See Also:
-
isTopicTruncatedBasedOnRetention
public boolean isTopicTruncatedBasedOnRetention(long retention) Test if retention is less than the configured DEPRECATED_TOPIC_MAX_RETENTION_MS value.- Specified by:
isTopicTruncatedBasedOnRetentionin interfaceAdmin- Returns:
trueif the specified retention is below the configuration;false otherwise.- See Also:
-
isTopicTruncatedBasedOnRetention
Topic should also be considered to get cleaned up if: retention is less than the configured ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS value. or 1. Topic retention equals fatalDataValidationFailureRetentionMs. 2. Topic is a version topic. 3. fatalDataValidationFailureRetentionMs has already passed since its creation.- Specified by:
isTopicTruncatedBasedOnRetentionin interfaceAdmin
-
getMinNumberOfUnusedKafkaTopicsToPreserve
public int getMinNumberOfUnusedKafkaTopicsToPreserve()- Specified by:
getMinNumberOfUnusedKafkaTopicsToPreservein interfaceAdmin- Returns:
- the controller configuration value for MIN_NUMBER_OF_UNUSED_KAFKA_TOPICS_TO_PRESERVE.
- See Also:
-
truncateKafkaTopic
We don't actually truncate any Kafka topic here; we just update the retention time.- Specified by:
truncateKafkaTopicin interfaceAdmin- Parameters:
kafkaTopicName-- Returns:
-
truncateKafkaTopic
Description copied from interface:AdminTruncate a Kafka topic by setting its retention time to the input value.- Specified by:
truncateKafkaTopicin interfaceAdmin- Parameters:
topicName- the name of the topic to truncate.retentionTime- the retention time in milliseconds to set for the topic.- Returns:
- true if truncating this topic successfully. false otherwise.
- See Also:
-
versionsForStore
- Specified by:
versionsForStorein interfaceAdmin- Returns:
- all versions of the specified store from a cluster.
-
getAllStores
- Specified by:
getAllStoresin interfaceAdmin- Returns:
- all stores in the specified cluster.
-
getAllStoreStatuses
Description copied from interface:AdminGet the statuses of all stores. The store status is decided by the current version. For example, if one partition only have 2 ONLINE replicas in the current version, we say this store is under replicated. Refer toStoreStatusfor the definition of each status.- Specified by:
getAllStoreStatusesin interfaceAdmin- Returns:
- a map whose key is store name and value is store's status.
- See Also:
-
hasStore
Test if the input store exists in a cluster. -
getStore
-
setStoreCurrentVersion
Update the current version of a specified store.- Specified by:
setStoreCurrentVersionin interfaceAdmin
-
rollForwardToFutureVersion
- Specified by:
rollForwardToFutureVersionin interfaceAdmin
-
rollbackToBackupVersion
Set backup version as current version in a child region.- Specified by:
rollbackToBackupVersionin interfaceAdmin
-
getBackupVersionNumber
Get backup version number, the largest online version number that is less than the current version number -
setStoreLargestUsedVersion
Update the largest used version number of a specified store.- Specified by:
setStoreLargestUsedVersionin interfaceAdmin
-
setStoreLargestUsedRTVersion
Update the largest used RT version number of a specified store.- Specified by:
setStoreLargestUsedRTVersionin interfaceAdmin
-
setStoreOwner
Update the owner of a specified store.- Specified by:
setStoreOwnerin interfaceAdmin
-
setStorePartitionCount
Since partition check/calculation only happens when adding new store version,setStorePartitionCount(String, String, int)would only change the number of partition for the following pushes. Current version would not be changed.- Specified by:
setStorePartitionCountin interfaceAdmin
-
setStoreWriteability
Update the writability of a specified store.- Specified by:
setStoreWriteabilityin interfaceAdmin
-
setStoreReadability
Update the readability of a specified store.- Specified by:
setStoreReadabilityin interfaceAdmin
-
setStoreReadWriteability
Update both readability and writability of a specified store.- Specified by:
setStoreReadWriteabilityin interfaceAdmin
-
getMetaStoreValue
- Specified by:
getMetaStoreValuein interfaceAdmin
-
getInUseValueSchemaIds
- Specified by:
getInUseValueSchemaIdsin interfaceAdmin
-
deleteValueSchemas
public void deleteValueSchemas(String clusterName, String storeName, Set<Integer> unusedValueSchemaIds) Description copied from interface:AdminDeletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIds- Specified by:
deleteValueSchemasin interfaceAdmin
-
updateStore
TODO: some logics are in parent controllerVeniceParentHelixAdmin#updateStore and some are in the child controller here. Need to unify them in the future.- Specified by:
updateStorein interfaceAdmin
-
updateClusterConfig
Update the LiveClusterConfig at runtime for a specified cluster.- Specified by:
updateClusterConfigin interfaceAdmin- Parameters:
clusterName- name of the Venice cluster.params- parameters to update.
-
updateDarkClusterConfig
- Specified by:
updateDarkClusterConfigin interfaceAdmin
-
replicateUpdateStore
public void replicateUpdateStore(String clusterName, String storeName, UpdateStoreQueryParams params) This method is invoked in parent controllers for store migration. -
mergeNewSettingsIntoOldHybridStoreConfig
protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig(Store oldStore, Optional<Long> hybridRewindSeconds, Optional<Long> hybridOffsetLagThreshold, Optional<Long> hybridTimeLagThreshold, Optional<DataReplicationPolicy> hybridDataReplicationPolicy, Optional<BufferReplayPolicy> bufferReplayPolicy, Optional<String> realTimeTopicName) Used by both theVeniceHelixAdminand theVeniceParentHelixAdmin- Parameters:
oldStore- Existing Store that is the source for updates. This object will not be modified by this method.hybridRewindSeconds- Optional is present if the returned object should include a new rewind timehybridOffsetLagThreshold- Optional is present if the returned object should include a new offset lag threshold- Returns:
- null if oldStore has no hybrid configs and optionals are not present,
otherwise a fully specified
HybridStoreConfig
-
storeMetadataUpdate
public void storeMetadataUpdate(String clusterName, String storeName, VeniceHelixAdmin.StoreMetadataOperation operation) Update the store metadata by applying provided operation.- Parameters:
clusterName- name of the cluster.storeName- name of the to be updated store.operation- the defined operation that update the store.
-
getStorageEngineOverheadRatio
- Specified by:
getStorageEngineOverheadRatioin interfaceAdmin- Returns:
- the configuration value for ConfigKeys.STORAGE_ENGINE_OVERHEAD_RATIO
-
containsHelixResource
Description copied from interface:StoreCleanerThis purpose of the function is to check if the given resource exists in the Helix cluster.- Specified by:
containsHelixResourcein interfaceStoreCleaner- Parameters:
clusterName- The Venice cluster that the resource belongs to.kafkaTopic- it's usually the store version name (version topic name).- Returns:
-
deleteHelixResource
Description copied from interface:StoreCleanerThis purpose of the function is to delete the given resource from the Helix cluster. Different fromStoreCleaner.deleteOneStoreVersion(String, String, int), this function will not check whether the store version is still a valid version inside Venice backend, and it will send the delete request to Helix cluster directly. Do enough sanity check before calling this function.- Specified by:
deleteHelixResourcein interfaceStoreCleaner- Parameters:
clusterName- The Venice cluster that the resource belongs to.kafkaTopic- It's usually the store version name (version topic name).
-
enableDisabledPartition
-
getKeySchema
- Specified by:
getKeySchemain interfaceAdmin- Returns:
- the key schema for the specified store.
-
getValueSchemas
- Specified by:
getValueSchemasin interfaceAdmin- Returns:
- the value schema for the specified store.
-
getDerivedSchemas
- Specified by:
getDerivedSchemasin interfaceAdmin- Returns:
- the derived schema for the specified store.
-
getValueSchemaId
- Specified by:
getValueSchemaIdin interfaceAdmin- Returns:
- the schema id for the specified store and value schema.
-
getDerivedSchemaId
- Specified by:
getDerivedSchemaIdin interfaceAdmin- Returns:
- the derived schema id for the specified store and derived schema.
-
getValueSchema
- Specified by:
getValueSchemain interfaceAdmin- Returns:
- the derived schema for the specified store and id.
-
addValueSchema
public SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType) - Specified by:
addValueSchemain interfaceAdmin- See Also:
-
addValueSchema
public SchemaEntry addValueSchema(String clusterName, String storeName, String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType compatibilityType) Add a new value schema for the given store with all specified properties and return a new SchemaEntry object containing the schema and its id.- Specified by:
addValueSchemain interfaceAdmin- Returns:
- an
SchemaEntryobject composed of a schema and its corresponding id.
-
addDerivedSchema
public DerivedSchemaEntry addDerivedSchema(String clusterName, String storeName, int valueSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties and return a newDerivedSchemaEntryobject containing the schema and its id.- Specified by:
addDerivedSchemain interfaceAdmin- Returns:
- an
DerivedSchemaEntryobject composed of specified properties.
-
addDerivedSchema
public DerivedSchemaEntry addDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId, String derivedSchemaStr) Add a new derived schema for the given store with all specified properties.- Specified by:
addDerivedSchemain interfaceAdmin- Returns:
- an
DerivedSchemaEntryobject composed of specified properties.
-
removeDerivedSchema
public DerivedSchemaEntry removeDerivedSchema(String clusterName, String storeName, int valueSchemaId, int derivedSchemaId) Description copied from interface:AdminRemove an existing derived schema- Specified by:
removeDerivedSchemain interfaceAdmin- Returns:
- the derived schema that is deleted or null if the schema doesn't exist
- See Also:
-
addSupersetSchema
public SchemaEntry addSupersetSchema(String clusterName, String storeName, String valueSchema, int valueSchemaId, String supersetSchemaStr, int supersetSchemaId) Add a new superset schema for the given store with all specified properties.Generate the superset schema off the current schema and latest superset schema (if any, if not pick the latest value schema) existing in the store. If the newly generated superset schema is unique add it to the store and update latestSuperSetValueSchemaId of the store.
- Specified by:
addSupersetSchemain interfaceAdmin
-
getReplicationMetadataSchemas
public Collection<RmdSchemaEntry> getReplicationMetadataSchemas(String clusterName, String storeName) - Specified by:
getReplicationMetadataSchemasin interfaceAdmin- Returns:
- a collection of
ReplicationMetadataSchemaEntryobject for the given store and cluster.
-
addReplicationMetadataSchema
public RmdSchemaEntry addReplicationMetadataSchema(String clusterName, String storeName, int valueSchemaId, int replicationMetadataVersionId, String replicationMetadataSchemaStr) Create a newReplicationMetadataSchemaEntryobject with the given properties and add it into schema repository if no duplication.- Specified by:
addReplicationMetadataSchemain interfaceAdmin- Returns:
ReplicationMetadataSchemaEntryobject reference.
-
validateAndMaybeRetrySystemStoreAutoCreation
public void validateAndMaybeRetrySystemStoreAutoCreation(String clusterName, String storeName, VeniceSystemStoreType systemStoreType) Check the creation results of a user store's system store. If the system store's current version is in error state, re-issue a new empty push and waits for the empty push to complete.- Specified by:
validateAndMaybeRetrySystemStoreAutoCreationin interfaceAdmin
-
getStorageNodes
- Specified by:
getStorageNodesin interfaceAdmin- Returns:
- a list of storage node instance names for a given cluster.
-
getHelixAdminClient
-
getDisabledPartitionStats
-
getStorageNodesStatus
- Specified by:
getStorageNodesStatusin interfaceAdmin- Returns:
- a map containing the storage node name and its connectivity status (
InstanceStatus).
-
removeStorageNode
Remove one storage node from the given cluster.It removes the given helix nodeId from the allowlist in ZK and its associated resource in Helix.
- Specified by:
removeStorageNodein interfaceAdmin
-
stop
Description copied from interface:AdminStop the helix controller for a single cluster. -
stopVeniceController
public void stopVeniceController()Description copied from interface:AdminStop the entire controller but not only the helix controller for a single cluster.- Specified by:
stopVeniceControllerin interfaceAdmin- See Also:
-
getOffLinePushStatus
Description copied from interface:AdminQuery the status of the offline push by given kafka topic. TODO We use kafka topic to tracking the status now but in the further we should use jobId instead of kafka TODO topic. Right now each kafka topic only have one offline job. But in the further one kafka topic could be TODO assigned multiple jobs like data migration job etc.- Specified by:
getOffLinePushStatusin interfaceAdmin- Returns:
- the status of current offline push for the passed kafka topic
- See Also:
-
getOffLinePushStatus
public Admin.OfflinePushStatusInfo getOffLinePushStatus(String clusterName, String kafkaTopic, Optional<String> incrementalPushVersion, String region, String targetedRegions, boolean isTargetRegionPushWithDeferredSwap) - Specified by:
getOffLinePushStatusin interfaceAdmin
-
getOffLinePushStatus
-
getOverallPushStatus
protected static ExecutionStatus getOverallPushStatus(ExecutionStatus veniceStatus, ExecutionStatus daVinciStatus) -
updateIdealState
-
getIdealState
-
getKafkaBootstrapServers
Description copied from interface:AdminReturn the ssl or non-ssl bootstrap servers based on the given flag.- Specified by:
getKafkaBootstrapServersin interfaceAdmin- Returns:
- kafka bootstrap servers url, if there are multiple will be comma separated.
- See Also:
-
getRegionName
Description copied from interface:AdminReturn the region name of this Admin- Specified by:
getRegionNamein interfaceAdmin- Returns:
- the region name of this controller
-
getNativeReplicationKafkaBootstrapServerAddress
- Specified by:
getNativeReplicationKafkaBootstrapServerAddressin interfaceAdmin- Returns:
- KafkaUrl for the given fabric.
- See Also:
-
getNativeReplicationSourceFabric
public String getNativeReplicationSourceFabric(String clusterName, Store store, Optional<String> sourceGridFabric, Optional<String> emergencySourceRegion, String targetedRegions) Source fabric selection priority: 1. Parent controller emergency source fabric config. 2. VPJ plugin targeted region config, however it will compute all selections based on the criteria below to select the source region. 3. VPJ plugin source grid fabric config. 4. Store level source fabric config. 5. Cluster level source fabric config.- Specified by:
getNativeReplicationSourceFabricin interfaceAdmin- Returns:
- the selected source fabric for a given store.
-
isSSLEnabledForPush
Description copied from interface:AdminReturn whether ssl is enabled for the given store for push.- Specified by:
isSSLEnabledForPushin interfaceAdmin- See Also:
-
isSslToKafka
public boolean isSslToKafka()Test if ssl is enabled to Kafka.- Specified by:
isSslToKafkain interfaceAdmin- See Also:
-
getTopicManager
- Specified by:
getTopicManagerin interfaceAdmin- See Also:
-
getTopicManager
- Specified by:
getTopicManagerin interfaceAdmin- See Also:
-
isLeaderControllerFor
Description copied from interface:AdminCheck if this controller itself is the leader controller for a given cluster or not. Note that the controller can be either a parent controller or a child controller since a cluster must have a leader child controller and a leader parent controller. The point is not to be confused the concept of leader-standby with parent-child controller architecture.- Specified by:
isLeaderControllerForin interfaceAdmin- See Also:
-
getAggregatedHealthStatus
public InstanceRemovableStatuses getAggregatedHealthStatus(String cluster, List<String> instances, List<String> toBeStoppedInstances, boolean isSSLEnabled) - Specified by:
getAggregatedHealthStatusin interfaceAdmin
-
calculateNumberOfPartitions
Calculate number of partition for given store.- Specified by:
calculateNumberOfPartitionsin interfaceAdmin
-
getReplicationFactor
- Specified by:
getReplicationFactorin interfaceAdmin- Returns:
- the replication factor of the given store.
-
getReplicas
- Specified by:
getReplicasin interfaceAdmin- Returns:
- a list of
Replicacreated for the given resource.
-
getReplicasOfStorageNode
- Specified by:
getReplicasOfStorageNodein interfaceAdmin- See Also:
-
isInstanceRemovable
public NodeRemovableResult isInstanceRemovable(String clusterName, String helixNodeId, List<String> lockedNodes) Description copied from interface:AdminAssuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster. For example, if there is only one online replica alive in this cluster which is hosted on the given instance. This instance should not be removed out of cluster, otherwise Venice will lose data. For detail criteria please refer toInstanceStatusDecider- Specified by:
isInstanceRemovablein interfaceAdmin- Parameters:
clusterName- The cluster were the hosts belong.helixNodeId- nodeId of helix participant. HOST_PORT.lockedNodes- A list of helix nodeIds whose resources are assumed to be unusable (stopped).- See Also:
-
getLeaderController
Description copied from interface:AdminGet instance of leader controller. If there is no leader controller for the given cluster, throw a VeniceException.- Specified by:
getLeaderControllerin interfaceAdmin- See Also:
-
getControllersByHelixState
Get controllers instance based on the given helix state. We look at the external view of the controller cluster to find the venice controller by the wanted state. -
getAllLiveInstanceControllers
Get all live instance controllers from ZK /LIVEINSTANCES -
addInstanceToAllowlist
Add the given helix nodeId into the allowlist in ZK.- Specified by:
addInstanceToAllowlistin interfaceAdmin
-
removeInstanceFromAllowList
Remove the given helix nodeId from the allowlist in ZK.- Specified by:
removeInstanceFromAllowListin interfaceAdmin
-
getAllowlist
- Specified by:
getAllowlistin interfaceAdmin- Returns:
- a list of all helix nodeIds in the allowlist for the given cluster from ZK.
-
killOfflinePush
Description copied from interface:AdminKill an offline push if it ran into errors or the corresponding version is being retired.- Specified by:
killOfflinePushin interfaceAdminisForcedKill- should be set to true when killing the push job for retiring the corresponding version.- See Also:
-
deleteParticipantStoreKillMessage
Compose aParticipantMessageKeymessage and execute a delete operation on the key to the cluster's participant store. -
sendKillMessageToParticipantStore
-
getStorageNodesStatus
Description copied from interface:AdminQuery and return the current status of the given storage node. The "storage node status" is composed by "status" of all replicas in that storage node. "status" is an integer value of Helix state:- DROPPED=1
- ERROR=2
- OFFLINE=3
- BOOTSTRAP=4
- ONLINE=5
- Specified by:
getStorageNodesStatusin interfaceAdmin- See Also:
-
isStorageNodeNewerOrEqualTo
public boolean isStorageNodeNewerOrEqualTo(String clusterName, String instanceId, StorageNodeStatus oldStatus) Description copied from interface:AdminCompare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one. Compare will go through each of replica in this storage node, if all their statuses values were larger or equal than the statuses value in the given storage node status, We say current storage node status is "Newer" or "Equal " to the given one.- Specified by:
isStorageNodeNewerOrEqualToin interfaceAdmin- See Also:
-
setAdminConsumerService
- Specified by:
setAdminConsumerServicein interfaceAdmin- See Also:
-
skipAdminMessage
Description copied from interface:AdminThe admin consumption task tries to deal with failures to process an admin message by retrying. If there is a message that cannot be processed for some reason, we will need to forcibly skip that message in order to unblock the task from consuming subsequent messages.- Specified by:
skipAdminMessagein interfaceAdminskipDIV- tries to skip only the DIV check for the blocking message.- See Also:
-
getLastSucceedExecutionId
Description copied from interface:AdminGet the id of the last succeed execution in this controller.- Specified by:
getLastSucceedExecutionIdin interfaceAdmin- See Also:
-
getLastSucceededExecutionId
Get last succeeded execution id for a given store; if storeName is null, return the last succeeded execution id for the cluster- Parameters:
clusterName-storeName-- Returns:
- the last succeeded execution id or null if the cluster/store is invalid or the admin consumer service for the given cluster is not up and running yet.
-
getAdminCommandExecutionTracker
Description copied from interface:AdminGet the tracker used to track the execution of the admin command for the given cluster.- Specified by:
getAdminCommandExecutionTrackerin interfaceAdmin- See Also:
-
getAdminTopicMetadata
- Specified by:
getAdminTopicMetadatain interfaceAdmin- Returns:
- cluster-level execution id, offset, upstream offset, and admin operation protocol version. If store name is specified, it returns store-level execution id.
-
updateAdminTopicMetadata
public void updateAdminTopicMetadata(String clusterName, long executionId, Optional<String> storeName, Optional<Long> offset, Optional<Long> upstreamOffset) Update cluster-level execution id, offset and upstream offset. If store name is specified, it updates the store-level execution id.- Specified by:
updateAdminTopicMetadatain interfaceAdmin
-
updateAdminOperationProtocolVersion
public void updateAdminOperationProtocolVersion(String clusterName, Long adminOperationProtocolVersion) Update AdminOperationProtocolVersion in metadata- Specified by:
updateAdminOperationProtocolVersionin interfaceAdmin
-
getAdminOperationVersionFromControllers
Get the admin operation protocol versions from all controllers for specific cluster.- Specified by:
getAdminOperationVersionFromControllersin interfaceAdmin- Parameters:
clusterName- : the cluster name- Returns:
- map (controllerName: version). Example: {localhost_1234=1, localhost_1235=1}
-
getLocalAdminOperationProtocolVersion
public long getLocalAdminOperationProtocolVersion()Get the local admin operation protocol version.- Specified by:
getLocalAdminOperationProtocolVersionin interfaceAdmin
-
getRoutersClusterConfig
Description copied from interface:AdminGet the cluster level config for all routers.- Specified by:
getRoutersClusterConfigin interfaceAdmin- See Also:
-
updateRoutersClusterConfig
public void updateRoutersClusterConfig(String clusterName, Optional<Boolean> isThrottlingEnable, Optional<Boolean> isQuotaRebalancedEnable, Optional<Boolean> isMaxCapacityProtectionEnabled, Optional<Integer> expectedRouterCount) Description copied from interface:AdminUpdate the cluster level for all routers.- Specified by:
updateRoutersClusterConfigin interfaceAdmin- See Also:
-
getAllStorePushStrategyForMigration
Unsupported operation in the child controller.- Specified by:
getAllStorePushStrategyForMigrationin interfaceAdmin
-
setStorePushStrategyForMigration
Unsupported operation in the child controller.- Specified by:
setStorePushStrategyForMigrationin interfaceAdmin
-
discoverCluster
Description copied from interface:AdminFind the cluster which the given store belongs to. Return the pair of the cluster name and the d2 service associated with that cluster.- Specified by:
discoverClusterin interfaceAdmin- See Also:
-
getRouterD2Service
Description copied from interface:AdminFind the router d2 service associated with a given cluster name.- Specified by:
getRouterD2Servicein interfaceAdmin- See Also:
-
getServerD2Service
Description copied from interface:AdminFind the server d2 service associated with a given cluster name.- Specified by:
getServerD2Servicein interfaceAdmin- See Also:
-
findAllBootstrappingVersions
Description copied from interface:AdminFind the store versions which have at least one bootstrap replica.- Specified by:
findAllBootstrappingVersionsin interfaceAdmin- See Also:
-
getVeniceWriterFactory
- Specified by:
getVeniceWriterFactoryin interfaceAdmin- Returns:
- a
VeniceWriterFactoryobject used by the Venice controller to create the venice writer.
-
getPubSubSSLProperties
- Specified by:
getPubSubSSLPropertiesin interfaceAdmin
-
getAdminConsumerService
- Specified by:
getAdminConsumerServicein interfaceAdmin
-
stopMonitorOfflinePush
-
close
public void close()Cause VeniceHelixAdmin and its associated services to stop executing. -
getHelixVeniceClusterResources
- Specified by:
getHelixVeniceClusterResourcesin interfaceAdmin- Returns:
- the aggregate resources required by controller to manage a Venice cluster.
-
getControllerName
- Specified by:
getControllerNamein interfaceAdmin
-
getStoreConfigRepo
Description copied from interface:AdminReturn a shared store config repository.- Specified by:
getStoreConfigRepoin interfaceAdmin
-
isLeaderControllerOfControllerCluster
public boolean isLeaderControllerOfControllerCluster()This function is used to detect whether current node is the leader controller of controller cluster. Be careful to use this function since it will talk to Zookeeper directly every time.- Specified by:
isLeaderControllerOfControllerClusterin interfaceAdmin- Returns:
-
setStoreConfigForMigration
public void setStoreConfigForMigration(String storeName, String srcClusterName, String destClusterName) Update "migrationDestCluster" and "migrationSrcCluster" fields of the "/storeConfigs/storeName" znode.- Parameters:
storeName- name of the store.srcClusterName- name of the source cluster.destClusterName- name of the destination cluster.
-
updateAclForStore
Description copied from interface:AdminProvision a new set of ACL for a venice store and its associated kafka topic.- Specified by:
updateAclForStorein interfaceAdmin- See Also:
-
getAclForStore
Description copied from interface:AdminFetch the current set of ACL provisioned for a venice store and its associated kafka topic.- Specified by:
getAclForStorein interfaceAdmin- Returns:
- The string representation of the accessPermissions. It will return empty string in case store is not present.
- See Also:
-
deleteAclForStore
Description copied from interface:AdminDelete the current set of ACL provisioned for a venice store and its associated kafka topic.- Specified by:
deleteAclForStorein interfaceAdmin- See Also:
-
configureActiveActiveReplication
public void configureActiveActiveReplication(String clusterName, VeniceUserStoreType storeType, Optional<String> storeName, boolean enableActiveActiveReplicationForCluster, Optional<String> regionsFilter) Description copied from interface:AdminEnable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster. If storeName is not empty, only the specified store might be updated.- Specified by:
configureActiveActiveReplicationin interfaceAdmin- See Also:
-
getClusterStores
Description copied from interface:AdminReturn all stores in a cluster.- Specified by:
getClusterStoresin interfaceAdmin- Returns:
- a list of
StoreInfoof all stores in the specified cluster.
-
getClusterStaleStores
- Specified by:
getClusterStaleStoresin interfaceAdmin
-
getStoresForCompaction
- intermediary betweenLogCompactionServiceandCompactionManager- injects the child controller'sControllerClientinto the functionCompactionManager.getStoresForCompaction(String, Map)- serves as API endpoint to query stores ready for log compaction- Specified by:
getStoresForCompactionin interfaceAdmin- Parameters:
clusterName-- Returns:
- a list of
StoreInfoof stores in clusterName that are ready for log compaction.
-
repushStore
triggers repush for storeName for log compaction of store topic- intermediary between
LogCompactionServiceandCompactionManager- serves as API endpoint to trigger scheduled & adhoc log compaction- Specified by:
repushStorein interfaceAdmin- Parameters:
repushJobRequest-- Returns:
- Throws:
Exception
-
getCompactionManager
- Specified by:
getCompactionManagerin interfaceAdmin
-
listStorePushInfo
public Map<String,RegionPushDetails> listStorePushInfo(String clusterName, String storeName, boolean isPartitionDetailEnabled) - Specified by:
listStorePushInfoin interfaceAdmin
-
preFetchDeadStoreStats
-
getDeadStores
public List<StoreInfo> getDeadStores(String clusterName, String storeName, Map<String, String> params) - Specified by:
getDeadStoresin interfaceAdminparams- Parameters for dead store detection including: - "includeSystemStores": boolean (default: false) - "lookBackMS": long (optional) - Future extension points- Returns:
- list of stores infos that are considered dead. A store is considered dead if it exists but has no user traffic in it's read or write path.
- See Also:
-
getRegionPushDetails
public RegionPushDetails getRegionPushDetails(String clusterName, String storeName, boolean isPartitionDetailAdded) - Specified by:
getRegionPushDetailsin interfaceAdmin- Returns:
RegionPushDetailsobject containing the specified store's push status.
-
retrievePushStatus
-
checkResourceCleanupBeforeStoreCreation
Description copied from interface:AdminCheck whether there are any resource left for the store creation in cluster: If there is any, this function should throw Exception.- Specified by:
checkResourceCleanupBeforeStoreCreationin interfaceAdmin- See Also:
-
wipeCluster
public void wipeCluster(String clusterName, String fabric, Optional<String> storeName, Optional<Integer> versionNum) Delete stores from the cluster including both store data and metadata.The API provides the flexibility to delete a single store or a single version. Cluster name and fabric are required parameters, but store name and version number are optional. If store name is empty, all stores in the cluster are deleted.
- Specified by:
wipeClusterin interfaceAdmin- Parameters:
clusterName- name of the Venice cluster.fabric- name of the fabric.storeName- name of the to be deleted store, if value is absent, all stores in the cluster are deleted.versionNum- the number of the version to be deleted, if present, only the specified version is deleted.
-
compareStore
public StoreComparisonInfo compareStore(String clusterName, String storeName, String fabricA, String fabricB) Description copied from interface:AdminCompare store metadata and version states between two fabrics.- Specified by:
compareStorein interfaceAdmin- See Also:
-
copyOverStoreSchemasAndConfigs
public StoreInfo copyOverStoreSchemasAndConfigs(String clusterName, String srcFabric, String destFabric, String storeName) - Specified by:
copyOverStoreSchemasAndConfigsin interfaceAdmin- See Also:
-
isParent
public boolean isParent()Description copied from interface:AdminCheck whether the controller works as a parent controller -
getParentControllerRegionState
Description copied from interface:AdminReturn the state of the region of the parent controller.- Specified by:
getParentControllerRegionStatein interfaceAdmin- Returns:
ParentControllerRegionState.ACTIVEwhich means that the parent controller in the region is serving requests. Otherwise, returnParentControllerRegionState.PASSIVE- See Also:
-
getChildDataCenterControllerUrlMap
Description copied from interface:AdminGet child datacenter to child controller url mapping.- Specified by:
getChildDataCenterControllerUrlMapin interfaceAdmin- Returns:
- A map of child datacenter -> child controller url
- See Also:
-
getChildDataCenterControllerD2Map
Description copied from interface:AdminGet child datacenter to child controller d2 zk host mapping- Specified by:
getChildDataCenterControllerD2Mapin interfaceAdmin- Returns:
- A map of child datacenter -> child controller d2 zk host
- See Also:
-
getChildControllerD2ServiceName
Description copied from interface:AdminGet child datacenter controller d2 service name- Specified by:
getChildControllerD2ServiceNamein interfaceAdmin- Returns:
- d2 service name
- See Also:
-
getMetaStoreWriter
Description copied from interface:AdminReturn aMetaStoreWriter, which can be shared across different Venice clusters.- Specified by:
getMetaStoreWriterin interfaceAdmin- See Also:
-
getMetaStoreReader
- Specified by:
getMetaStoreReaderin interfaceAdmin
-
getEmergencySourceRegion
Description copied from interface:AdminReturn the emergency source region configuration.- Specified by:
getEmergencySourceRegionin interfaceAdmin- See Also:
-
getAggregateRealTimeTopicSource
Description copied from interface:AdminReturn the source Kafka boostrap server url for aggregate real-time topic updates- Specified by:
getAggregateRealTimeTopicSourcein interfaceAdmin- See Also:
-
isActiveActiveReplicationEnabledInAllRegion
public boolean isActiveActiveReplicationEnabledInAllRegion(String clusterName, String storeName, boolean checkCurrentVersion) Description copied from interface:AdminReturns true if A/A replication is enabled in all child controller and parent controller. This is implemented only in parent controller. Otherwise, return false.- Specified by:
isActiveActiveReplicationEnabledInAllRegionin interfaceAdmin- See Also:
-
getClustersLeaderOf
Description copied from interface:AdminGet a list of clusters this controller is a leader of.- Specified by:
getClustersLeaderOfin interfaceAdmin- Returns:
- a list of clusters this controller is a leader of.
- See Also:
-
getBackupVersionDefaultRetentionMs
public long getBackupVersionDefaultRetentionMs()Description copied from interface:AdminReturns default backup version retention time.- Specified by:
getBackupVersionDefaultRetentionMsin interfaceAdmin- See Also:
-
getDefaultMaxRecordSizeBytes
public int getDefaultMaxRecordSizeBytes()- Specified by:
getDefaultMaxRecordSizeBytesin interfaceAdmin- Returns:
- The default value of
VeniceWriter.maxRecordSizeByteswhich is provided to the VPJ and Consumer as a controller config to dynamically control the setting per cluster. - See Also:
-
nodeReplicaReadiness
public Pair<NodeReplicasReadinessState,List<Replica>> nodeReplicaReadiness(String cluster, String helixNodeId) Description copied from interface:AdminhelixNodeId nodeId of helix participant. HOST_PORT. Returns ture, if all current version replicas of the input node are ready to serve. false and all unready replicas otherwise.- Specified by:
nodeReplicaReadinessin interfaceAdmin- See Also:
-
initiateDataRecovery
public void initiateDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, boolean copyAllVersionConfigs, Optional<Version> sourceFabricVersion) Description copied from interface:AdminInitiate data recovery for a store version given a source fabric.- Specified by:
initiateDataRecoveryin interfaceAdmin- Parameters:
clusterName- of the store.storeName- of the store.version- of the store.sourceFabric- to be used as the source for data recovery.copyAllVersionConfigs- a boolean to indicate whether all version configs should be copied from the source fabric or only the essential version configs and generate the rest based on destination fabric's Store configs.sourceFabricVersion- source fabric's Version configs used to configure the recovering version in the destination fabric.- See Also:
-
prepareDataRecovery
public void prepareDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Description copied from interface:AdminPrepare for data recovery in the destination fabric. The interested store version might have lingering states and resources in the destination fabric from previous failed attempts. Perform some basic checks to make sure the store version in the destination fabric is capable of performing data recovery and cleanup any lingering states and resources.- Specified by:
prepareDataRecoveryin interfaceAdmin- See Also:
-
isStoreVersionReadyForDataRecovery
public Pair<Boolean,String> isStoreVersionReadyForDataRecovery(String clusterName, String storeName, int version, String sourceFabric, String destinationFabric, Optional<Integer> sourceAmplificationFactor) Description copied from interface:AdminCheck if the store version's previous states and resources are cleaned up and ready to start data recovery.- Specified by:
isStoreVersionReadyForDataRecoveryin interfaceAdmin- Returns:
- whether is ready to start data recovery and the reason if it's not ready.
- See Also:
-
isAdminTopicConsumptionEnabled
Description copied from interface:AdminReturn whether the admin consumption task is enabled for the passed cluster.- Specified by:
isAdminTopicConsumptionEnabledin interfaceAdmin- See Also:
-
getLargestUsedVersionFromStoreGraveyard
Description copied from interface:AdminDeprecated but remain here to keep compatibility untilAdmin.getLargestUsedVersion(String, String)is used.- Specified by:
getLargestUsedVersionFromStoreGraveyardin interfaceAdmin
-
getLargestUsedVersion
- Specified by:
getLargestUsedVersionin interfaceAdmin
-
createStoragePersona
public void createStoragePersona(String clusterName, String name, long quotaNumber, Set<String> storesToEnforce, Set<String> owners) - Specified by:
createStoragePersonain interfaceAdmin- See Also:
-
getStoragePersona
- Specified by:
getStoragePersonain interfaceAdmin- See Also:
-
deleteStoragePersona
- Specified by:
deleteStoragePersonain interfaceAdmin- See Also:
-
updateStoragePersona
public void updateStoragePersona(String clusterName, String name, UpdateStoragePersonaQueryParams queryParams) - Specified by:
updateStoragePersonain interfaceAdmin- See Also:
-
getPersonaAssociatedWithStore
- Specified by:
getPersonaAssociatedWithStorein interfaceAdmin- See Also:
-
getClusterStoragePersonas
- Specified by:
getClusterStoragePersonasin interfaceAdmin
-
cleanupInstanceCustomizedStates
Description copied from interface:AdminScan through instance level customized states and remove any lingering ZNodes that are no longer relevant. This operation shouldn't be needed under normal circumstances. It's intended to cleanup ZNodes that failed to be deleted due to bugs and errors.- Specified by:
cleanupInstanceCustomizedStatesin interfaceAdmin- Parameters:
clusterName- to perform the cleanup.- Returns:
- list of deleted ZNode paths.
-
getStoreGraveyard
- Specified by:
getStoreGraveyardin interfaceAdmin
-
removeStoreFromGraveyard
- Specified by:
removeStoreFromGraveyardin interfaceAdmin
-
getPushStatusStoreReader
- Specified by:
getPushStatusStoreReaderin interfaceAdmin
-
getPushStatusStoreWriter
- Specified by:
getPushStatusStoreWriterin interfaceAdmin
-
sendHeartbeatToSystemStore
public void sendHeartbeatToSystemStore(String clusterName, String storeName, long heartbeatTimeStamp) Description copied from interface:AdminSend a heartbeat timestamp to targeted system store.- Specified by:
sendHeartbeatToSystemStorein interfaceAdmin
-
getHeartbeatFromSystemStore
Description copied from interface:AdminRead the latest heartbeat timestamp from system store. If it failed to read from system store, this method should return -1.- Specified by:
getHeartbeatFromSystemStorein interfaceAdmin
-
autoMigrateStore
public void autoMigrateStore(String srcClusterName, String destClusterName, String storeName, Optional<Integer> currStep, Optional<Boolean> abortOnFailure) - Specified by:
autoMigrateStorein interfaceAdmin
-
getSslFactory
-
isClusterWipeAllowed
-
getControllerConfig
- Specified by:
getControllerConfigin interfaceAdmin
-
setPushJobDetailsStoreClient
public void setPushJobDetailsStoreClient(AvroSpecificStoreClient<PushJobStatusRecordKey, PushJobDetails> client) -
getPubSubTopicRepository
- Specified by:
getPubSubTopicRepositoryin interfaceAdmin
-
getLogContext
- Specified by:
getLogContextin interfaceAdmin
-