Class VeniceHelixAdmin

  • All Implemented Interfaces:
    Admin, StoreCleaner, java.io.Closeable, java.lang.AutoCloseable

    public class VeniceHelixAdmin
    extends java.lang.Object
    implements Admin, StoreCleaner
    Helix Admin based on 0.8.4.215 APIs.

    After using controller as service mode. There are two levels of cluster and controllers. Each venice controller will hold a level1 helix controller which will keep connecting to Helix, there is a cluster only used for all of these level1 controllers(controller's cluster). The second level is our venice clusters. Like prod cluster, dev cluster etc. Each of cluster will be Helix resource in the controller's cluster. Helix will choose one of level1 controller becoming the leader of our venice cluster. In our distributed controllers state transition handler, a level2 controller will be initialized to manage this venice cluster only. If this level1 controller is chosen as the leader controller of multiple Venice clusters, multiple level2 controller will be created based on cluster specific config.

    Admin is shared by multiple cluster's controllers running in one physical Venice controller instance.

    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void abortMigration​(java.lang.String srcClusterName, java.lang.String destClusterName, java.lang.String storeName)
      Abort store migration by resetting migration flag at the source cluster, resetting storeConfig, and updating "cluster" in "/storeConfigs" znode back to the source cluster.
      DerivedSchemaEntry addDerivedSchema​(java.lang.String clusterName, java.lang.String storeName, int valueSchemaId, int derivedSchemaId, java.lang.String derivedSchemaStr)
      Add a new derived schema for the given store with all specified properties.
      DerivedSchemaEntry addDerivedSchema​(java.lang.String clusterName, java.lang.String storeName, int valueSchemaId, java.lang.String derivedSchemaStr)
      Add a new derived schema for the given store with all specified properties and return a new DerivedSchemaEntry object containing the schema and its id.
      void addInstanceToAllowlist​(java.lang.String clusterName, java.lang.String helixNodeId)
      Add the given helix nodeId into the allowlist in ZK.
      RmdSchemaEntry addReplicationMetadataSchema​(java.lang.String clusterName, java.lang.String storeName, int valueSchemaId, int replicationMetadataVersionId, java.lang.String replicationMetadataSchemaStr)
      Create a new ReplicationMetadataSchemaEntry object with the given properties and add it into schema repository if no duplication.
      boolean addSpecificVersion​(java.lang.String clusterName, java.lang.String storeName, Version version)
      TODO refactor addVersion to these broken down methods instead of doing everything in one giant method.
      SchemaEntry addSupersetSchema​(java.lang.String clusterName, java.lang.String storeName, java.lang.String valueSchema, int valueSchemaId, java.lang.String supersetSchemaStr, int supersetSchemaId)
      Add a new superset schema for the given store with all specified properties.
      SchemaEntry addValueSchema​(java.lang.String clusterName, java.lang.String storeName, java.lang.String valueSchemaStr, int schemaId, DirectionalSchemaCompatibilityType compatibilityType)
      Add a new value schema for the given store with all specified properties and return a new SchemaEntry object containing the schema and its id.
      SchemaEntry addValueSchema​(java.lang.String clusterName, java.lang.String storeName, java.lang.String valueSchemaStr, DirectionalSchemaCompatibilityType expectedCompatibilityType)  
      void addVersionAndStartIngestion​(java.lang.String clusterName, java.lang.String storeName, java.lang.String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, java.lang.String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, boolean versionSwapDeferred, int repushSourceVersion)
      This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked from the admin channel.
      Pair<java.lang.Boolean,​Version> addVersionAndTopicOnly​(java.lang.String clusterName, java.lang.String storeName, java.lang.String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, java.lang.String compressionDictionary, java.lang.String remoteKafkaBootstrapServers, java.util.Optional<java.lang.String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, java.util.Optional<java.lang.String> emergencySourceRegion, boolean versionSwapDeferred)  
      Pair<java.lang.Boolean,​Version> addVersionAndTopicOnly​(java.lang.String clusterName, java.lang.String storeName, java.lang.String pushJobId, int versionNumber, int numberOfPartitions, int replicationFactor, boolean sendStartOfPush, boolean sorted, Version.PushType pushType, java.lang.String compressionDictionary, java.lang.String remoteKafkaBootstrapServers, java.util.Optional<java.lang.String> sourceGridFabric, long rewindTimeInSecondsOverride, int replicationMetadataVersionId, java.util.Optional<java.lang.String> emergencySourceRegion, boolean versionSwapDeferred, java.lang.String targetedRegions, int repushSourceVersion)
      A wrapper to invoke VeniceHelixAdmin#addVersion to only increment the version and create the topic(s) needed without starting ingestion.
      Version addVersionOnly​(java.lang.String clusterName, java.lang.String storeName, java.lang.String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, java.lang.String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId)
      Only add version to the store without creating the topic or start ingestion.
      int calculateNumberOfPartitions​(java.lang.String clusterName, java.lang.String storeName)
      Calculate number of partition for given store.
      protected void checkPreConditionForCreateStore​(java.lang.String clusterName, java.lang.String storeName, java.lang.String keySchema, java.lang.String valueSchema, boolean allowSystemStore, boolean skipLingeringResourceCheck)
      Check whether Controller should block the incoming store creation.
      void checkResourceCleanupBeforeStoreCreation​(java.lang.String clusterName, java.lang.String storeName)
      Check whether there are any resource left for the store creation in cluster: {@param clusterName} If there is any, this function should throw Exception.
      java.util.List<java.lang.String> cleanupInstanceCustomizedStates​(java.lang.String clusterName)
      Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant.
      void clearIngestionKillMessageAndVerify​(java.lang.String clusterName, java.lang.String versionTopicName)
      Clear KILL messages from a participant system store.
      void clearInstanceMonitor​(java.lang.String clusterName)  
      void close()
      Cause VeniceHelixAdmin and its associated services to stop executing.
      StoreComparisonInfo compareStore​(java.lang.String clusterName, java.lang.String storeName, java.lang.String fabricA, java.lang.String fabricB)
      Compare store metadata and version states between two fabrics.
      void completeMigration​(java.lang.String srcClusterName, java.lang.String destClusterName, java.lang.String storeName)  
      void configureActiveActiveReplication​(java.lang.String clusterName, VeniceUserStoreType storeType, java.util.Optional<java.lang.String> storeName, boolean enableActiveActiveReplicationForCluster, java.util.Optional<java.lang.String> regionsFilter)
      Enable/disable active active replications for certain stores (batch only, hybrid only, incremental push, hybrid or incremental push, all) in a cluster.
      boolean containsHelixResource​(java.lang.String clusterName, java.lang.String kafkaTopic)
      This purpose of the function is to check if the given resource exists in the Helix cluster.
      StoreInfo copyOverStoreSchemasAndConfigs​(java.lang.String clusterName, java.lang.String srcFabric, java.lang.String destFabric, java.lang.String storeName)  
      void createHelixResourceAndStartMonitoring​(java.lang.String clusterName, java.lang.String storeName, Version version)
      Create Helix-resources for a given storage node cluster and starts monitoring a new push.
      void createSpecificVersionTopic​(java.lang.String clusterName, java.lang.String storeName, Version version)
      Create the corresponding version topic based on the provided Version
      void createStoragePersona​(java.lang.String clusterName, java.lang.String name, long quotaNumber, java.util.Set<java.lang.String> storesToEnforce, java.util.Set<java.lang.String> owners)  
      void createStore​(java.lang.String clusterName, java.lang.String storeName, java.lang.String owner, java.lang.String keySchema, java.lang.String valueSchema, boolean isSystemStore, java.util.Optional<java.lang.String> accessPermissions)
      Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.
      void deleteAclForStore​(java.lang.String clusterName, java.lang.String storeName)
      Delete the current set of ACL provisioned for a venice store and its associated kafka topic.
      java.util.List<Version> deleteAllVersionsInStore​(java.lang.String clusterName, java.lang.String storeName)
      Delete all of venice versions in given store(including venice resource, kafka topic, offline pushs and all related resources).
      void deleteHelixResource​(java.lang.String clusterName, java.lang.String kafkaTopic)
      This purpose of the function is to delete the given resource from the Helix cluster.
      void deleteOldVersionInStore​(java.lang.String clusterName, java.lang.String storeName, int versionNum)
      Delete the given version from the store.
      void deleteOneStoreVersion​(java.lang.String clusterName, java.lang.String storeName, int versionNumber)
      Delete version from cluster, removing all related resources
      void deleteParticipantStoreKillMessage​(java.lang.String clusterName, java.lang.String kafkaTopic)
      Compose a ParticipantMessageKey message and execute a delete operation on the key to the cluster's participant store.
      void deleteStoragePersona​(java.lang.String clusterName, java.lang.String name)  
      void deleteStore​(java.lang.String clusterName, java.lang.String storeName, int largestUsedVersionNumber, boolean waitOnRTTopicDeletion)
      This method will delete store data, metadata, version and rt topics One exception is for stores with isMigrating flag set.
      void deleteValueSchemas​(java.lang.String clusterName, java.lang.String storeName, java.util.Set<java.lang.Integer> unusedValueSchemaIds)
      Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIds
      Pair<java.lang.String,​java.lang.String> discoverCluster​(java.lang.String storeName)
      Find the cluster which the given store belongs to.
      void enableDisabledPartition​(java.lang.String clusterName, java.lang.String kafkaTopic, boolean enableAll)  
      java.util.Map<java.lang.String,​java.lang.String> findAllBootstrappingVersions​(java.lang.String clusterName)
      Find the store versions which have at least one bootstrap replica.
      java.lang.String getAclForStore​(java.lang.String clusterName, java.lang.String storeName)
      Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.
      HelixAdapterSerializer getAdapterSerializer()  
      java.util.Optional<AdminCommandExecutionTracker> getAdminCommandExecutionTracker​(java.lang.String clusterName)
      Get the tracker used to track the execution of the admin command for the given cluster.
      java.util.Map<java.lang.String,​java.lang.Long> getAdminTopicMetadata​(java.lang.String clusterName, java.util.Optional<java.lang.String> storeName)  
      java.util.Optional<java.lang.String> getAggregateRealTimeTopicSource​(java.lang.String clusterName)
      Return the source Kafka boostrap server url for aggregate real-time topic updates
      java.util.Set<java.lang.String> getAllowlist​(java.lang.String clusterName)  
      java.util.Map<java.lang.String,​java.lang.String> getAllStorePushStrategyForMigration()
      Unsupported operation in the child controller.
      java.util.List<Store> getAllStores​(java.lang.String clusterName)  
      java.util.Map<java.lang.String,​java.lang.String> getAllStoreStatuses​(java.lang.String clusterName)
      Get the statuses of all stores.
      int getBackupVersion​(java.lang.String clusterName, java.lang.String storeName)  
      long getBackupVersionDefaultRetentionMs()
      Returns default backup version retention time.
      int getBackupVersionNumber​(java.util.List<Version> versions, int currentVersion)
      Get backup version number, the largest online version number that is less than the current version number
      java.util.Map<java.lang.String,​java.lang.String> getBackupVersionsForMultiColos​(java.lang.String clusterName, java.lang.String storeName)  
      BatchJobHeartbeatValue getBatchJobHeartbeatValue​(BatchJobHeartbeatKey batchJobHeartbeatKey)  
      java.lang.String getChildControllerD2ServiceName​(java.lang.String clusterName)
      Get child datacenter controller d2 service name
      java.util.Map<java.lang.String,​java.lang.String> getChildDataCenterControllerD2Map​(java.lang.String clusterName)
      Get child datacenter to child controller d2 zk host mapping
      java.util.Map<java.lang.String,​java.lang.String> getChildDataCenterControllerUrlMap​(java.lang.String clusterName)
      Get child datacenter to child controller url mapping.
      java.util.List<java.lang.String> getClustersLeaderOf()
      Get a list of clusters this controller is a leader of.
      java.util.Map<java.lang.String,​StoreDataAudit> getClusterStaleStores​(java.lang.String clusterName)  
      java.util.List<StoragePersona> getClusterStoragePersonas​(java.lang.String clusterName)  
      java.util.ArrayList<StoreInfo> getClusterStores​(java.lang.String clusterName)
      Return all stores in a cluster.
      java.util.Map<java.lang.String,​ControllerClient> getControllerClientMap​(java.lang.String clusterName)  
      int getCurrentVersion​(java.lang.String clusterName, java.lang.String storeName)  
      java.util.Map<java.lang.String,​java.lang.Integer> getCurrentVersionsForMultiColos​(java.lang.String clusterName, java.lang.String storeName)  
      int getDefaultMaxRecordSizeBytes()  
      GeneratedSchemaID getDerivedSchemaId​(java.lang.String clusterName, java.lang.String storeName, java.lang.String schemaStr)  
      java.util.Collection<DerivedSchemaEntry> getDerivedSchemas​(java.lang.String clusterName, java.lang.String storeName)  
      DisabledPartitionStats getDisabledPartitionStats​(java.lang.String clusterName)  
      java.util.Optional<java.lang.String> getEmergencySourceRegion​(java.lang.String clusterName)
      Return the emergency source region configuration.
      ExecutionIdAccessor getExecutionIdAccessor()  
      int getFutureVersion​(java.lang.String clusterName, java.lang.String storeName)  
      java.util.Map<java.lang.String,​java.lang.String> getFutureVersionsForMultiColos​(java.lang.String clusterName, java.lang.String storeName)  
      long getHeartbeatFromSystemStore​(java.lang.String clusterName, java.lang.String systemStoreName)
      Read the latest heartbeat timestamp from system store.
      protected org.apache.helix.HelixAdmin getHelixAdmin()  
      HelixAdminClient getHelixAdminClient()  
      HelixVeniceClusterResources getHelixVeniceClusterResources​(java.lang.String cluster)  
      Version getIncrementalPushVersion​(java.lang.String clusterName, java.lang.String storeName)
      Right now, it will return the latest version recorded in parent controller.
      java.util.Set<java.lang.Integer> getInUseValueSchemaIds​(java.lang.String clusterName, java.lang.String storeName)  
      java.lang.String getKafkaBootstrapServers​(boolean isSSL)
      Return the ssl or non-ssl bootstrap servers based on the given flag.
      SchemaEntry getKeySchema​(java.lang.String clusterName, java.lang.String storeName)  
      int getLargestUsedVersionFromStoreGraveyard​(java.lang.String clusterName, java.lang.String storeName)  
      java.lang.Long getLastSucceededExecutionId​(java.lang.String clusterName, java.lang.String storeName)
      Get last succeeded execution id for a given store; if storeName is null, return the last succeeded execution id for the cluster
      java.lang.Long getLastSucceedExecutionId​(java.lang.String clusterName)
      Get the id of the last succeed execution in this controller.
      Instance getLeaderController​(java.lang.String clusterName)
      Get instance of leader controller.
      LiveInstanceMonitor getLiveInstanceMonitor​(java.lang.String clusterName)  
      MetaStoreReader getMetaStoreReader()  
      StoreMetaValue getMetaStoreValue​(StoreMetaKey metaKey, java.lang.String storeName)  
      MetaStoreWriter getMetaStoreWriter()
      Return a MetaStoreWriter, which can be shared across different Venice clusters.
      int getMinNumberOfUnusedKafkaTopicsToPreserve()  
      java.lang.String getNativeReplicationKafkaBootstrapServerAddress​(java.lang.String sourceFabric)  
      java.lang.String getNativeReplicationSourceFabric​(java.lang.String clusterName, Store store, java.util.Optional<java.lang.String> sourceGridFabric, java.util.Optional<java.lang.String> emergencySourceRegion, java.lang.String targetedRegions)
      Source fabric selection priority: 1.
      Admin.OfflinePushStatusInfo getOffLinePushStatus​(java.lang.String clusterName, java.lang.String kafkaTopic)
      Query the status of the offline push by given kafka topic.
      Admin.OfflinePushStatusInfo getOffLinePushStatus​(java.lang.String clusterName, java.lang.String kafkaTopic, java.util.Optional<java.lang.String> incrementalPushVersion, java.lang.String region, java.lang.String targetedRegions)  
      int getOnlineFutureVersion​(java.lang.String clusterName, java.lang.String storeName)  
      protected static ExecutionStatus getOverallPushStatus​(ExecutionStatus veniceStatus, ExecutionStatus daVinciStatus)  
      ParentControllerRegionState getParentControllerRegionState()
      Return the state of the region of the parent controller.
      StoragePersona getPersonaAssociatedWithStore​(java.lang.String clusterName, java.lang.String storeName)  
      PubSubConsumerAdapterFactory getPubSubConsumerAdapterFactory()  
      VeniceProperties getPubSubSSLProperties​(java.lang.String pubSubBrokerAddress)  
      PubSubTopicRepository getPubSubTopicRepository()  
      PushJobDetails getPushJobDetails​(PushJobStatusRecordKey key)  
      PushStatusStoreReader getPushStatusStoreReader()  
      PushStatusStoreWriter getPushStatusStoreWriter()  
      HelixReadOnlyZKSharedSchemaRepository getReadOnlyZKSharedSchemaRepository()
      Return a shared read only schema repository for zk shared stores.
      HelixReadOnlyZKSharedSystemStoreRepository getReadOnlyZKSharedSystemStoreRepository()
      Return a shared read only store repository for zk shared stores.
      java.lang.String getRealTimeTopic​(java.lang.String clusterName, java.lang.String storeName)
      Get the real time topic name for a given store.
      java.lang.String getRegionName()
      Return the region name of this Admin
      RegionPushDetails getRegionPushDetails​(java.lang.String clusterName, java.lang.String storeName, boolean isPartitionDetailAdded)  
      java.util.List<Replica> getReplicas​(java.lang.String clusterName, java.lang.String kafkaTopic)  
      java.util.List<Replica> getReplicasOfStorageNode​(java.lang.String cluster, java.lang.String instanceId)  
      int getReplicationFactor​(java.lang.String clusterName, java.lang.String storeName)  
      java.util.Optional<org.apache.avro.Schema> getReplicationMetadataSchema​(java.lang.String clusterName, java.lang.String storeName, int valueSchemaID, int rmdVersionID)  
      java.util.Collection<RmdSchemaEntry> getReplicationMetadataSchemas​(java.lang.String clusterName, java.lang.String storeName)  
      RepushInfo getRepushInfo​(java.lang.String clusterName, java.lang.String storeName, java.util.Optional<java.lang.String> fabricName)  
      RoutersClusterConfig getRoutersClusterConfig​(java.lang.String clusterName)
      Get the cluster level config for all routers.
      java.lang.String getServerD2Service​(java.lang.String clusterName)
      Find the server d2 service associated with a given cluster name.
      java.util.Optional<SSLFactory> getSslFactory()  
      protected static java.util.Optional<Version> getStartedVersion​(Store store)
      The intended semantic is to use this method to find the version that something is currently pushing to.
      double getStorageEngineOverheadRatio​(java.lang.String clusterName)  
      java.util.List<java.lang.String> getStorageNodes​(java.lang.String clusterName)  
      java.util.Map<java.lang.String,​java.lang.String> getStorageNodesStatus​(java.lang.String clusterName, boolean enableReplica)  
      StorageNodeStatus getStorageNodesStatus​(java.lang.String clusterName, java.lang.String instanceId)
      Query and return the current status of the given storage node.
      StoragePersona getStoragePersona​(java.lang.String clusterName, java.lang.String name)  
      Store getStore​(java.lang.String clusterName, java.lang.String storeName)  
      HelixReadOnlyStoreConfigRepository getStoreConfigRepo()
      Return a shared store config repository.
      StoreGraveyard getStoreGraveyard()  
      TopicManager getTopicManager()  
      TopicManager getTopicManager​(java.lang.String pubSubServerAddress)  
      SchemaEntry getValueSchema​(java.lang.String clusterName, java.lang.String storeName, int id)  
      int getValueSchemaId​(java.lang.String clusterName, java.lang.String storeName, java.lang.String valueSchemaStr)  
      java.util.Collection<SchemaEntry> getValueSchemas​(java.lang.String clusterName, java.lang.String storeName)  
      VeniceWriterFactory getVeniceWriterFactory()  
      org.apache.helix.zookeeper.impl.client.ZkClient getZkClient()  
      boolean hasStore​(java.lang.String clusterName, java.lang.String storeName)
      Test if the input store exists in a cluster.
      Version incrementVersionIdempotent​(java.lang.String clusterName, java.lang.String storeName, java.lang.String pushJobId, int numberOfPartitions, int replicationFactor, Version.PushType pushType, boolean sendStartOfPush, boolean sorted, java.lang.String compressionDictionary, java.util.Optional<java.lang.String> sourceGridFabric, java.util.Optional<java.security.cert.X509Certificate> requesterCert, long rewindTimeInSecondsOverride, java.util.Optional<java.lang.String> emergencySourceRegion, boolean versionSwapDeferred, java.lang.String targetedRegions, int repushSourceVersion)
      Note: this currently use the pushID to guarantee idempotence, unexpected behavior may result if multiple batch jobs push to the same store at the same time.
      void initiateDataRecovery​(java.lang.String clusterName, java.lang.String storeName, int version, java.lang.String sourceFabric, java.lang.String destinationFabric, boolean copyAllVersionConfigs, java.util.Optional<Version> sourceFabricVersion)
      Initiate data recovery for a store version given a source fabric.
      void initStorageCluster​(java.lang.String clusterName)
      Create and configure the Venice storage cluster with required properties in Helix and waits the resource's (partial) partition to appear in the external view.
      boolean isActiveActiveReplicationEnabledInAllRegion​(java.lang.String clusterName, java.lang.String storeName, boolean checkCurrentVersion)
      Returns true if A/A replication is enabled in all child controller and parent controller.
      boolean isAdminTopicConsumptionEnabled​(java.lang.String clusterName)
      Return whether the admin consumption task is enabled for the passed cluster.
      boolean isClusterValid​(java.lang.String clusterName)
      Test if a cluster is valid (in Helix cluster list).
      boolean isClusterWipeAllowed​(java.lang.String clusterName)  
      NodeRemovableResult isInstanceRemovable​(java.lang.String clusterName, java.lang.String helixNodeId, java.util.List<java.lang.String> lockedNodes, boolean isFromInstanceView)
      Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster.
      boolean isLeaderControllerFor​(java.lang.String clusterName)
      Check if this controller itself is the leader controller for a given cluster or not.
      boolean isLeaderControllerOfControllerCluster()
      This function is used to detect whether current node is the leader controller of controller cluster.
      boolean isParent()
      Check whether the controller works as a parent controller
      boolean isResourceStillAlive​(java.lang.String resourceName)
      Test if a given helix resource is still alive (existent in ZK).
      boolean isSSLEnabledForPush​(java.lang.String clusterName, java.lang.String storeName)
      Return whether ssl is enabled for the given store for push.
      boolean isSslToKafka()
      Test if ssl is enabled to Kafka.
      boolean isStorageNodeNewerOrEqualTo​(java.lang.String clusterName, java.lang.String instanceId, StorageNodeStatus oldStatus)
      Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one.
      boolean isStoreMigrationAllowed​(java.lang.String clusterName)
      Test if the store migration is allowed for a cluster.
      Pair<java.lang.Boolean,​java.lang.String> isStoreVersionReadyForDataRecovery​(java.lang.String clusterName, java.lang.String storeName, int version, java.lang.String sourceFabric, java.lang.String destinationFabric, java.util.Optional<java.lang.Integer> sourceAmplificationFactor)
      Check if the store version's previous states and resources are cleaned up and ready to start data recovery.
      boolean isTopicTruncated​(java.lang.String kafkaTopicName)
      Check if a kafka topic is absent or truncated.
      boolean isTopicTruncatedBasedOnRetention​(long retention)
      Test if retention is less than the configured DEPRECATED_TOPIC_MAX_RETENTION_MS value.
      boolean isTopicTruncatedBasedOnRetention​(java.lang.String kafkaTopicName, long retentionTime)
      Topic should also be considered to get cleaned up if: retention is less than the configured ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS value.
      void killOfflinePush​(java.lang.String clusterName, java.lang.String kafkaTopic, boolean isForcedKill)
      Kill an offline push if it ran into errors or the corresponding version is being retired.
      java.util.Map<java.lang.String,​RegionPushDetails> listStorePushInfo​(java.lang.String clusterName, java.lang.String storeName, boolean isPartitionDetailEnabled)  
      protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig​(Store oldStore, java.util.Optional<java.lang.Long> hybridRewindSeconds, java.util.Optional<java.lang.Long> hybridOffsetLagThreshold, java.util.Optional<java.lang.Long> hybridTimeLagThreshold, java.util.Optional<DataReplicationPolicy> hybridDataReplicationPolicy, java.util.Optional<BufferReplayPolicy> bufferReplayPolicy)
      Used by both the VeniceHelixAdmin and the VeniceParentHelixAdmin
      void migrateStore​(java.lang.String srcClusterName, java.lang.String destClusterName, java.lang.String storeName)
      Main implementation for migrating a store from its source cluster to a new destination cluster.
      Pair<NodeReplicasReadinessState,​java.util.List<Replica>> nodeReplicaReadiness​(java.lang.String cluster, java.lang.String helixNodeId)
      helixNodeId nodeId of helix participant.
      Version peekNextVersion​(java.lang.String clusterName, java.lang.String storeName)  
      void prepareDataRecovery​(java.lang.String clusterName, java.lang.String storeName, int version, java.lang.String sourceFabric, java.lang.String destinationFabric, java.util.Optional<java.lang.Integer> sourceAmplificationFactor)
      Prepare for data recovery in the destination fabric.
      DerivedSchemaEntry removeDerivedSchema​(java.lang.String clusterName, java.lang.String storeName, int valueSchemaId, int derivedSchemaId)
      Remove an existing derived schema
      void removeInstanceFromAllowList​(java.lang.String clusterName, java.lang.String helixNodeId)
      Remove the given helix nodeId from the allowlist in ZK.
      void removeStorageNode​(java.lang.String clusterName, java.lang.String instanceId)
      Remove one storage node from the given cluster.
      void removeStoreFromGraveyard​(java.lang.String clusterName, java.lang.String storeName)  
      void replicateAddVersionAndStartIngestion​(java.lang.String clusterName, java.lang.String storeName, java.lang.String pushJobId, int versionNumber, int numberOfPartitions, Version.PushType pushType, java.lang.String remoteKafkaBootstrapServers, long rewindTimeInSecondsOverride, int replicationMetadataVersionId)
      This method is invoked in parent controllers to replicate new version signals for migrating store.
      void replicateUpdateStore​(java.lang.String clusterName, java.lang.String storeName, UpdateStoreQueryParams params)
      This method is invoked in parent controllers for store migration.
      void retireOldStoreVersions​(java.lang.String clusterName, java.lang.String storeName, boolean deleteBackupOnStartPush, int currentVersionBeforePush)
      For a given store, determine its versions to delete based on the BackupStrategy settings and execute the deletion in the cluster (including all its resources).
      OfflinePushStatus retrievePushStatus​(java.lang.String clusterName, StoreInfo store)  
      void rollbackToBackupVersion​(java.lang.String clusterName, java.lang.String storeName, java.lang.String regionFilter)
      Set backup version as current version in a child region.
      void rollForwardToFutureVersion​(java.lang.String clusterName, java.lang.String storeName, java.lang.String regionFilter)  
      void sendHeartbeatToSystemStore​(java.lang.String clusterName, java.lang.String storeName, long heartbeatTimeStamp)
      Send a heartbeat timestamp to targeted system store.
      void sendKillMessageToParticipantStore​(java.lang.String clusterName, java.lang.String kafkaTopic)  
      void sendPushJobDetails​(PushJobStatusRecordKey key, PushJobDetails value)
      Lazy initialize a Venice writer for an internal real time topic store of push job details records.
      void setAdminConsumerService​(java.lang.String clusterName, AdminConsumerService service)  
      void setPushJobDetailsStoreClient​(AvroSpecificStoreClient<PushJobStatusRecordKey,​PushJobDetails> client)  
      void setStoreConfigForMigration​(java.lang.String storeName, java.lang.String srcClusterName, java.lang.String destClusterName)
      Update "migrationDestCluster" and "migrationSrcCluster" fields of the "/storeConfigs/storeName" znode.
      void setStoreCurrentVersion​(java.lang.String clusterName, java.lang.String storeName, int versionNumber)
      Update the current version of a specified store.
      void setStoreLargestUsedVersion​(java.lang.String clusterName, java.lang.String storeName, int versionNumber)
      Update the largest used version number of a specified store.
      void setStoreOwner​(java.lang.String clusterName, java.lang.String storeName, java.lang.String owner)
      Update the owner of a specified store.
      void setStorePartitionCount​(java.lang.String clusterName, java.lang.String storeName, int partitionCount)
      Since partition check/calculation only happens when adding new store version, setStorePartitionCount(String, String, int) would only change the number of partition for the following pushes.
      void setStorePushStrategyForMigration​(java.lang.String voldemortStoreName, java.lang.String strategy)
      Unsupported operation in the child controller.
      void setStoreReadability​(java.lang.String clusterName, java.lang.String storeName, boolean desiredReadability)
      Update the readability of a specified store.
      void setStoreReadWriteability​(java.lang.String clusterName, java.lang.String storeName, boolean isAccessible)
      Update both readability and writability of a specified store.
      void setStoreWriteability​(java.lang.String clusterName, java.lang.String storeName, boolean desiredWriteability)
      Update the writability of a specified store.
      void skipAdminMessage​(java.lang.String clusterName, long offset, boolean skipDIV)
      The admin consumption task tries to deal with failures to process an admin message by retrying.
      void startInstanceMonitor​(java.lang.String clusterName)  
      void stop​(java.lang.String clusterName)
      Stop the helix controller for a single cluster.
      void stopMonitorOfflinePush​(java.lang.String clusterName, java.lang.String topic, boolean deletePushStatus, boolean isForcedDelete)  
      void stopVeniceController()
      Stop the entire controller but not only the helix controller for a single cluster.
      void storeMetadataUpdate​(java.lang.String clusterName, java.lang.String storeName, VeniceHelixAdmin.StoreMetadataOperation operation)
      Update the store metadata by applying provided operation.
      void topicCleanupWhenPushComplete​(java.lang.String clusterName, java.lang.String storeName, int versionNumber)
      In this function, Controller will setup proper compaction strategy when the push job is full completed, and here are the reasons to set it up after the job completes: 1.
      boolean truncateKafkaTopic​(java.lang.String kafkaTopicName)
      We don't actually truncate any Kafka topic here; we just update the retention time.
      boolean truncateKafkaTopic​(java.lang.String topicName, long retentionTime)
      Truncate a Kafka topic by setting its retention time to the input value.
      void updateAclForStore​(java.lang.String clusterName, java.lang.String storeName, java.lang.String accessPermissions)
      Provision a new set of ACL for a venice store and its associated kafka topic.
      void updateAdminTopicMetadata​(java.lang.String clusterName, long executionId, java.util.Optional<java.lang.String> storeName, java.util.Optional<java.lang.Long> offset, java.util.Optional<java.lang.Long> upstreamOffset)
      Update cluster-level execution id, offset and upstream offset.
      void updateClusterConfig​(java.lang.String clusterName, UpdateClusterConfigQueryParams params)
      Update the LiveClusterConfig at runtime for a specified cluster.
      void updateClusterDiscovery​(java.lang.String storeName, java.lang.String oldCluster, java.lang.String newCluster, java.lang.String initiatingCluster)
      Update the cluster discovery of a given store by writing to the StoreConfig ZNode.
      void updateRoutersClusterConfig​(java.lang.String clusterName, java.util.Optional<java.lang.Boolean> isThrottlingEnable, java.util.Optional<java.lang.Boolean> isQuotaRebalancedEnable, java.util.Optional<java.lang.Boolean> isMaxCapacityProtectionEnabled, java.util.Optional<java.lang.Integer> expectedRouterCount)
      Update the cluster level for all routers.
      void updateStoragePersona​(java.lang.String clusterName, java.lang.String name, UpdateStoragePersonaQueryParams queryParams)  
      void updateStore​(java.lang.String clusterName, java.lang.String storeName, UpdateStoreQueryParams params)
      TODO: some logics are in parent controller VeniceParentHelixAdmin #updateStore and some are in the child controller here.
      void validateAndMaybeRetrySystemStoreAutoCreation​(java.lang.String clusterName, java.lang.String storeName, VeniceSystemStoreType systemStoreType)
      Check the creation results of a user store's system store.
      java.util.List<Version> versionsForStore​(java.lang.String clusterName, java.lang.String storeName)  
      boolean whetherEnableBatchPushFromAdmin​(java.lang.String storeName)
      Test if a store is allowed for a batch push.
      void wipeCluster​(java.lang.String clusterName, java.lang.String fabric, java.util.Optional<java.lang.String> storeName, java.util.Optional<java.lang.Integer> versionNum)
      Delete stores from the cluster including both store data and metadata.
      void writeEndOfPush​(java.lang.String clusterName, java.lang.String storeName, int versionNumber, boolean alsoWriteStartOfPush)
      Create a local Venice writer based on store version info and, for each partition, use the writer to send END_OF_PUSH and END_OF_SEGMENT control messages to Kafka.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS

        protected static final int INTERNAL_STORE_GET_RRT_TOPIC_ATTEMPTS
        See Also:
        Constant Field Values
      • INTERNAL_STORE_RTT_RETRY_BACKOFF_MS

        protected static final long INTERNAL_STORE_RTT_RETRY_BACKOFF_MS
    • Method Detail

      • startInstanceMonitor

        public void startInstanceMonitor​(java.lang.String clusterName)
        Specified by:
        startInstanceMonitor in interface Admin
      • getLiveInstanceMonitor

        public LiveInstanceMonitor getLiveInstanceMonitor​(java.lang.String clusterName)
      • clearInstanceMonitor

        public void clearInstanceMonitor​(java.lang.String clusterName)
        Specified by:
        clearInstanceMonitor in interface Admin
      • getZkClient

        public org.apache.helix.zookeeper.impl.client.ZkClient getZkClient()
      • initStorageCluster

        public void initStorageCluster​(java.lang.String clusterName)
        Create and configure the Venice storage cluster with required properties in Helix and waits the resource's (partial) partition to appear in the external view.
        Specified by:
        initStorageCluster in interface Admin
        Parameters:
        clusterName - Venice cluster name.
      • isResourceStillAlive

        public boolean isResourceStillAlive​(java.lang.String resourceName)
        Test if a given helix resource is still alive (existent in ZK).
        Specified by:
        isResourceStillAlive in interface Admin
        Parameters:
        resourceName - Helix resource name.
        Returns:
        true if resource exists; false otherwise.
      • isClusterValid

        public boolean isClusterValid​(java.lang.String clusterName)
        Test if a cluster is valid (in Helix cluster list).
        Specified by:
        isClusterValid in interface Admin
        Parameters:
        clusterName - Venice cluster name.
        Returns:
        true if input cluster is in Helix cluster list; false otherwise.
      • getHelixAdmin

        protected org.apache.helix.HelixAdmin getHelixAdmin()
      • createStore

        public void createStore​(java.lang.String clusterName,
                                java.lang.String storeName,
                                java.lang.String owner,
                                java.lang.String keySchema,
                                java.lang.String valueSchema,
                                boolean isSystemStore,
                                java.util.Optional<java.lang.String> accessPermissions)
        Create a new ZK store and its configuration in the store repository and create schemas in the schema repository.
        Specified by:
        createStore in interface Admin
        Parameters:
        clusterName - Venice cluster where the store locates.
        storeName - name of the store.
        owner - owner of the store.
        keySchema - key schema of the store.
        valueSchema - value schema of the store.
        isSystemStore - if the store is a system store.
        accessPermissions - json string representing the access-permissions.
      • deleteStore

        public void deleteStore​(java.lang.String clusterName,
                                java.lang.String storeName,
                                int largestUsedVersionNumber,
                                boolean waitOnRTTopicDeletion)
        This method will delete store data, metadata, version and rt topics One exception is for stores with isMigrating flag set. In that case, the corresponding kafka topics and storeConfig will not be deleted so that they are still available for the cloned store.
        Specified by:
        deleteStore in interface Admin
      • sendPushJobDetails

        public void sendPushJobDetails​(PushJobStatusRecordKey key,
                                       PushJobDetails value)
        Lazy initialize a Venice writer for an internal real time topic store of push job details records. Use this writer to put a pair of push job detail record (key and value).
        Specified by:
        sendPushJobDetails in interface Admin
        Parameters:
        key - key with which the specified value is to be associated.
        value - value to be associated with the specified key.
      • writeEndOfPush

        public void writeEndOfPush​(java.lang.String clusterName,
                                   java.lang.String storeName,
                                   int versionNumber,
                                   boolean alsoWriteStartOfPush)
        Create a local Venice writer based on store version info and, for each partition, use the writer to send END_OF_PUSH and END_OF_SEGMENT control messages to Kafka.
        Specified by:
        writeEndOfPush in interface Admin
        Parameters:
        clusterName - name of the Venice cluster.
        storeName - name of the store.
        versionNumber - store version number.
        alsoWriteStartOfPush - if Venice writer sends a START_OF_PUSH control message first.
      • whetherEnableBatchPushFromAdmin

        public boolean whetherEnableBatchPushFromAdmin​(java.lang.String storeName)
        Test if a store is allowed for a batch push.
        Specified by:
        whetherEnableBatchPushFromAdmin in interface Admin
        Parameters:
        storeName - name of a store.
        Returns:
        true is the store is a participant system store or if Venice is running in single-region mode
      • isStoreMigrationAllowed

        public boolean isStoreMigrationAllowed​(java.lang.String clusterName)
        Test if the store migration is allowed for a cluster. It reads the value "allow.store.migration" from the "/clusterName/ClusterConfig" znode.
        Specified by:
        isStoreMigrationAllowed in interface Admin
        Parameters:
        clusterName - name of Venice cluster.
        Returns:
        true if store migration is allowed for the input cluster; false otherwise.
      • migrateStore

        public void migrateStore​(java.lang.String srcClusterName,
                                 java.lang.String destClusterName,
                                 java.lang.String storeName)
        Main implementation for migrating a store from its source cluster to a new destination cluster. A new store (with same properties, e.g. name, owner, key schema, value schema) is created at the destination cluster and its StoreInfo is also cloned. For a store with enabled meta system store or enabled davinci push status, those system stores are also migrated. Different store versions are evaluated for the migration. For those versions to be migrated, it triggers the ADD_VERSION and starts ingestion at the destination cluster.
        Specified by:
        migrateStore in interface Admin
        Parameters:
        srcClusterName - name of the source cluster.
        destClusterName - name of the destination cluster.
        storeName - name of the target store.
      • clearIngestionKillMessageAndVerify

        public void clearIngestionKillMessageAndVerify​(java.lang.String clusterName,
                                                       java.lang.String versionTopicName)
        Clear KILL messages from a participant system store.
      • getControllerClientMap

        public java.util.Map<java.lang.String,​ControllerClient> getControllerClientMap​(java.lang.String clusterName)
      • abortMigration

        public void abortMigration​(java.lang.String srcClusterName,
                                   java.lang.String destClusterName,
                                   java.lang.String storeName)
        Abort store migration by resetting migration flag at the source cluster, resetting storeConfig, and updating "cluster" in "/storeConfigs" znode back to the source cluster.
        Specified by:
        abortMigration in interface Admin
        Parameters:
        srcClusterName - name of the source cluster.
        destClusterName - name of the destination cluster.
        storeName - name of the store in migration.
      • updateClusterDiscovery

        public void updateClusterDiscovery​(java.lang.String storeName,
                                           java.lang.String oldCluster,
                                           java.lang.String newCluster,
                                           java.lang.String initiatingCluster)
        Description copied from interface: Admin
        Update the cluster discovery of a given store by writing to the StoreConfig ZNode.
        Specified by:
        updateClusterDiscovery in interface Admin
        Parameters:
        storeName - of the store.
        oldCluster - for the store.
        newCluster - for the store.
        initiatingCluster - that is making the update. This is needed because in the case of store migration sometimes the update is not made by the leader of the current cluster but instead the leader of the source cluster.
        See Also:
        Admin.updateClusterDiscovery(String, String, String, String)
      • checkPreConditionForCreateStore

        protected void checkPreConditionForCreateStore​(java.lang.String clusterName,
                                                       java.lang.String storeName,
                                                       java.lang.String keySchema,
                                                       java.lang.String valueSchema,
                                                       boolean allowSystemStore,
                                                       boolean skipLingeringResourceCheck)
        Check whether Controller should block the incoming store creation. Inside this function, there is a logic to check whether there are any lingering resources since the requested store could be just deleted recently. This check should be skipped in Child Controller, but only enabled in Parent Controller because of the following reasons: 1. Parent Controller has the strict order that the system store must be created before the host Venice store. 2. Child Controller doesn't have this strict order since the admin messages of Child Controller could be executed in parallel since they are different store names. So when such kind of race condition happens, it will cause a dead loop: a. The version creation of system store will create a RT topic in Parent Cluster. b. The RT topic will be mirrored by KMM to the Child Cluster. c. The version creation admin message of system store will be blocked in Child Controller since the host Venice store doesn't exist. d. The store creation admin message of the host Venice store will be blocked in Child Controller because of lingering resource check (RT topic of its system store already exists, which is created by KMM). TODO: Evaluate if this code needs to change now that KMM has been deprecated. In the future, once Venice gets rid of KMM, the topic won't be automatically created by KMM, and this race condition will be addressed. So far, Child Controller will skip lingering resource check when handling store creation admin message.
      • addVersionAndStartIngestion

        public void addVersionAndStartIngestion​(java.lang.String clusterName,
                                                java.lang.String storeName,
                                                java.lang.String pushJobId,
                                                int versionNumber,
                                                int numberOfPartitions,
                                                Version.PushType pushType,
                                                java.lang.String remoteKafkaBootstrapServers,
                                                long rewindTimeInSecondsOverride,
                                                int replicationMetadataVersionId,
                                                boolean versionSwapDeferred,
                                                int repushSourceVersion)
        This is a wrapper for VeniceHelixAdmin#addVersion but performs additional operations needed for add version invoked from the admin channel. Therefore, this method is mainly invoked from the admin task upon processing an add version message.
        Specified by:
        addVersionAndStartIngestion in interface Admin
      • replicateAddVersionAndStartIngestion

        public void replicateAddVersionAndStartIngestion​(java.lang.String clusterName,
                                                         java.lang.String storeName,
                                                         java.lang.String pushJobId,
                                                         int versionNumber,
                                                         int numberOfPartitions,
                                                         Version.PushType pushType,
                                                         java.lang.String remoteKafkaBootstrapServers,
                                                         long rewindTimeInSecondsOverride,
                                                         int replicationMetadataVersionId)
        This method is invoked in parent controllers to replicate new version signals for migrating store.
      • addVersionAndTopicOnly

        public Pair<java.lang.Boolean,​Version> addVersionAndTopicOnly​(java.lang.String clusterName,
                                                                            java.lang.String storeName,
                                                                            java.lang.String pushJobId,
                                                                            int versionNumber,
                                                                            int numberOfPartitions,
                                                                            int replicationFactor,
                                                                            boolean sendStartOfPush,
                                                                            boolean sorted,
                                                                            Version.PushType pushType,
                                                                            java.lang.String compressionDictionary,
                                                                            java.lang.String remoteKafkaBootstrapServers,
                                                                            java.util.Optional<java.lang.String> sourceGridFabric,
                                                                            long rewindTimeInSecondsOverride,
                                                                            int replicationMetadataVersionId,
                                                                            java.util.Optional<java.lang.String> emergencySourceRegion,
                                                                            boolean versionSwapDeferred)
      • addVersionAndTopicOnly

        public Pair<java.lang.Boolean,​Version> addVersionAndTopicOnly​(java.lang.String clusterName,
                                                                            java.lang.String storeName,
                                                                            java.lang.String pushJobId,
                                                                            int versionNumber,
                                                                            int numberOfPartitions,
                                                                            int replicationFactor,
                                                                            boolean sendStartOfPush,
                                                                            boolean sorted,
                                                                            Version.PushType pushType,
                                                                            java.lang.String compressionDictionary,
                                                                            java.lang.String remoteKafkaBootstrapServers,
                                                                            java.util.Optional<java.lang.String> sourceGridFabric,
                                                                            long rewindTimeInSecondsOverride,
                                                                            int replicationMetadataVersionId,
                                                                            java.util.Optional<java.lang.String> emergencySourceRegion,
                                                                            boolean versionSwapDeferred,
                                                                            java.lang.String targetedRegions,
                                                                            int repushSourceVersion)
        A wrapper to invoke VeniceHelixAdmin#addVersion to only increment the version and create the topic(s) needed without starting ingestion.
      • addVersionOnly

        public Version addVersionOnly​(java.lang.String clusterName,
                                      java.lang.String storeName,
                                      java.lang.String pushJobId,
                                      int versionNumber,
                                      int numberOfPartitions,
                                      Version.PushType pushType,
                                      java.lang.String remoteKafkaBootstrapServers,
                                      long rewindTimeInSecondsOverride,
                                      int replicationMetadataVersionId)
        Only add version to the store without creating the topic or start ingestion. Used to sync version metadata in the parent fabric during store migration.
      • addSpecificVersion

        public boolean addSpecificVersion​(java.lang.String clusterName,
                                          java.lang.String storeName,
                                          Version version)
        TODO refactor addVersion to these broken down methods instead of doing everything in one giant method. Perform add version to a given store with the provided Version
      • createSpecificVersionTopic

        public void createSpecificVersionTopic​(java.lang.String clusterName,
                                               java.lang.String storeName,
                                               Version version)
        Create the corresponding version topic based on the provided Version
      • createHelixResourceAndStartMonitoring

        public void createHelixResourceAndStartMonitoring​(java.lang.String clusterName,
                                                          java.lang.String storeName,
                                                          Version version)
        Create Helix-resources for a given storage node cluster and starts monitoring a new push.
      • incrementVersionIdempotent

        public Version incrementVersionIdempotent​(java.lang.String clusterName,
                                                  java.lang.String storeName,
                                                  java.lang.String pushJobId,
                                                  int numberOfPartitions,
                                                  int replicationFactor,
                                                  Version.PushType pushType,
                                                  boolean sendStartOfPush,
                                                  boolean sorted,
                                                  java.lang.String compressionDictionary,
                                                  java.util.Optional<java.lang.String> sourceGridFabric,
                                                  java.util.Optional<java.security.cert.X509Certificate> requesterCert,
                                                  long rewindTimeInSecondsOverride,
                                                  java.util.Optional<java.lang.String> emergencySourceRegion,
                                                  boolean versionSwapDeferred,
                                                  java.lang.String targetedRegions,
                                                  int repushSourceVersion)
        Note: this currently use the pushID to guarantee idempotence, unexpected behavior may result if multiple batch jobs push to the same store at the same time.
        Specified by:
        incrementVersionIdempotent in interface Admin
      • getStartedVersion

        protected static java.util.Optional<Version> getStartedVersion​(Store store)
        The intended semantic is to use this method to find the version that something is currently pushing to. It looks at all versions greater than the current version and identifies the version with a status of STARTED. If there is no STARTED version, it creates a new one for the push to use. This means we cannot use this method to support multiple concurrent pushes.
        Parameters:
        store -
        Returns:
        the started version if there is only one, throws an exception if there is an error version with a greater number than the current version. Otherwise returns Optional.empty()
      • getRealTimeTopic

        public java.lang.String getRealTimeTopic​(java.lang.String clusterName,
                                                 java.lang.String storeName)
        Get the real time topic name for a given store. If the topic is not created in Kafka, it creates the real time topic and returns the topic name.
        Specified by:
        getRealTimeTopic in interface Admin
        Parameters:
        clusterName - name of the Venice cluster.
        storeName - name of the store.
        Returns:
        name of the store's real time topic name.
      • getReplicationMetadataSchema

        public java.util.Optional<org.apache.avro.Schema> getReplicationMetadataSchema​(java.lang.String clusterName,
                                                                                       java.lang.String storeName,
                                                                                       int valueSchemaID,
                                                                                       int rmdVersionID)
        Specified by:
        getReplicationMetadataSchema in interface Admin
        Returns:
        replication metadata schema for a store in a cluster with specified schema ID and RMD protocol version ID.
      • getIncrementalPushVersion

        public Version getIncrementalPushVersion​(java.lang.String clusterName,
                                                 java.lang.String storeName)
        Description copied from interface: Admin
        Right now, it will return the latest version recorded in parent controller. There are a couple of edge cases. 1. If a push fails in some colos, the version will be inconsistent among colos 2. If rollback happens, latest version will not be the current version. TODO: figure out how we'd like to cover these edge cases
        Specified by:
        getIncrementalPushVersion in interface Admin
        See Also:
        Admin.getIncrementalPushVersion(String, String)
      • getCurrentVersion

        public int getCurrentVersion​(java.lang.String clusterName,
                                     java.lang.String storeName)
        Specified by:
        getCurrentVersion in interface Admin
        Returns:
        The current version number of an input store in the specified Venice cluster or Store.NON_EXISTING_VERSION if none exists.
      • getFutureVersion

        public int getFutureVersion​(java.lang.String clusterName,
                                    java.lang.String storeName)
        Specified by:
        getFutureVersion in interface Admin
        Returns:
        Returns the online (completed, but not yet swapped) or future version with ongoing ingestion else if none exists returns Store.NON_EXISTING_VERSION
      • getBackupVersion

        public int getBackupVersion​(java.lang.String clusterName,
                                    java.lang.String storeName)
        Specified by:
        getBackupVersion in interface Admin
      • getOnlineFutureVersion

        public int getOnlineFutureVersion​(java.lang.String clusterName,
                                          java.lang.String storeName)
      • getCurrentVersionsForMultiColos

        public java.util.Map<java.lang.String,​java.lang.Integer> getCurrentVersionsForMultiColos​(java.lang.String clusterName,
                                                                                                       java.lang.String storeName)
        Specified by:
        getCurrentVersionsForMultiColos in interface Admin
      • getRepushInfo

        public RepushInfo getRepushInfo​(java.lang.String clusterName,
                                        java.lang.String storeName,
                                        java.util.Optional<java.lang.String> fabricName)
        Specified by:
        getRepushInfo in interface Admin
        Returns:
        a new RepushInfo object with specified store info.
      • getBackupVersionsForMultiColos

        public java.util.Map<java.lang.String,​java.lang.String> getBackupVersionsForMultiColos​(java.lang.String clusterName,
                                                                                                     java.lang.String storeName)
        Specified by:
        getBackupVersionsForMultiColos in interface Admin
      • peekNextVersion

        public Version peekNextVersion​(java.lang.String clusterName,
                                       java.lang.String storeName)
        Specified by:
        peekNextVersion in interface Admin
        Returns:
        the next version without adding the new version to the store.
      • deleteOldVersionInStore

        public void deleteOldVersionInStore​(java.lang.String clusterName,
                                            java.lang.String storeName,
                                            int versionNum)
        Description copied from interface: Admin
        Delete the given version from the store. If the given version is the current version, an exception will be thrown.
        Specified by:
        deleteOldVersionInStore in interface Admin
        See Also:
        Admin.deleteOldVersionInStore(String, String, int)
      • deleteOneStoreVersion

        public void deleteOneStoreVersion​(java.lang.String clusterName,
                                          java.lang.String storeName,
                                          int versionNumber)
        Delete version from cluster, removing all related resources
        Specified by:
        deleteOneStoreVersion in interface StoreCleaner
      • retireOldStoreVersions

        public void retireOldStoreVersions​(java.lang.String clusterName,
                                           java.lang.String storeName,
                                           boolean deleteBackupOnStartPush,
                                           int currentVersionBeforePush)
        For a given store, determine its versions to delete based on the BackupStrategy settings and execute the deletion in the cluster (including all its resources). It also truncates Kafka topics and Helix resources.
        Specified by:
        retireOldStoreVersions in interface StoreCleaner
        Parameters:
        clusterName - name of a cluster.
        storeName - name of the store to retire.
        deleteBackupOnStartPush - indicate if it is called in a start-of-push workflow.
        currentVersionBeforePush - current version before a new push.
      • topicCleanupWhenPushComplete

        public void topicCleanupWhenPushComplete​(java.lang.String clusterName,
                                                 java.lang.String storeName,
                                                 int versionNumber)
        In this function, Controller will setup proper compaction strategy when the push job is full completed, and here are the reasons to set it up after the job completes: 1. For batch push jobs to batch-only store, there is no impact. There could still be duplicate entries because of speculative executions in map-reduce job, but we are not planning to clean them up now. 2. For batch push jobs to hybrid/incremental stores, if the compaction is enabled at the beginning of the job, Kafka compaction could kick in during push job, and storage node could detect DIV error, such as missing messages, checksum mismatch, because speculative execution could produce duplicate entries, and we don't want to fail the push in this scenario and we still want to perform the strong DIV validation in batch push, so we could only enable compaction after the batch push completes. 3. For GF jobs to hybrid store, it is similar as #2, and it contains duplicate entries because there is no de-dedup happening anywhere. With this way, when load rebalance happens for hybrid/incremental stores, DIV error could be detected during ingestion at any phase since compaction might be enabled long-time ago. So in storage node, we need to add one more safeguard before throwing the DIV exception to check whether the topic is compaction-enabled or not. Since Venice is not going to change the compaction policy between non-compact and compact back and forth, checking whether topic is compaction-enabled or not when encountering DIV error should be good enough.
        Specified by:
        topicCleanupWhenPushComplete in interface StoreCleaner
      • isTopicTruncatedBasedOnRetention

        public boolean isTopicTruncatedBasedOnRetention​(java.lang.String kafkaTopicName,
                                                        long retentionTime)
        Topic should also be considered to get cleaned up if: retention is less than the configured ConfigKeys.DEPRECATED_TOPIC_MAX_RETENTION_MS value. or 1. Topic retention equals fatalDataValidationFailureRetentionMs. 2. Topic is a version topic. 3. fatalDataValidationFailureRetentionMs has already passed since its creation.
        Specified by:
        isTopicTruncatedBasedOnRetention in interface Admin
      • truncateKafkaTopic

        public boolean truncateKafkaTopic​(java.lang.String kafkaTopicName)
        We don't actually truncate any Kafka topic here; we just update the retention time.
        Specified by:
        truncateKafkaTopic in interface Admin
        Parameters:
        kafkaTopicName -
        Returns:
      • truncateKafkaTopic

        public boolean truncateKafkaTopic​(java.lang.String topicName,
                                          long retentionTime)
        Description copied from interface: Admin
        Truncate a Kafka topic by setting its retention time to the input value.
        Specified by:
        truncateKafkaTopic in interface Admin
        Parameters:
        topicName - the name of the topic to truncate.
        retentionTime - the retention time in milliseconds to set for the topic.
        Returns:
        true if truncating this topic successfully. false otherwise.
        See Also:
        Admin.truncateKafkaTopic(String, long)
      • versionsForStore

        public java.util.List<Version> versionsForStore​(java.lang.String clusterName,
                                                        java.lang.String storeName)
        Specified by:
        versionsForStore in interface Admin
        Returns:
        all versions of the specified store from a cluster.
      • getAllStores

        public java.util.List<Store> getAllStores​(java.lang.String clusterName)
        Specified by:
        getAllStores in interface Admin
        Returns:
        all stores in the specified cluster.
      • getAllStoreStatuses

        public java.util.Map<java.lang.String,​java.lang.String> getAllStoreStatuses​(java.lang.String clusterName)
        Description copied from interface: Admin
        Get the statuses of all stores. The store status is decided by the current version. For example, if one partition only have 2 ONLINE replicas in the current version, we say this store is under replicated. Refer to StoreStatus for the definition of each status.
        Specified by:
        getAllStoreStatuses in interface Admin
        Returns:
        a map whose key is store name and value is store's status.
        See Also:
        Admin.getAllStoreStatuses(String)
      • hasStore

        public boolean hasStore​(java.lang.String clusterName,
                                java.lang.String storeName)
        Test if the input store exists in a cluster.
        Specified by:
        hasStore in interface Admin
        Parameters:
        clusterName - name of a cluster.
        storeName - name of a store.
        Returns:
        ture if store exists in the cluster. false otherwise.
      • getStore

        public Store getStore​(java.lang.String clusterName,
                              java.lang.String storeName)
        Specified by:
        getStore in interface Admin
        Returns:
        Store object reference from the input store name.
      • setStoreCurrentVersion

        public void setStoreCurrentVersion​(java.lang.String clusterName,
                                           java.lang.String storeName,
                                           int versionNumber)
        Update the current version of a specified store.
        Specified by:
        setStoreCurrentVersion in interface Admin
      • rollForwardToFutureVersion

        public void rollForwardToFutureVersion​(java.lang.String clusterName,
                                               java.lang.String storeName,
                                               java.lang.String regionFilter)
        Specified by:
        rollForwardToFutureVersion in interface Admin
      • rollbackToBackupVersion

        public void rollbackToBackupVersion​(java.lang.String clusterName,
                                            java.lang.String storeName,
                                            java.lang.String regionFilter)
        Set backup version as current version in a child region.
        Specified by:
        rollbackToBackupVersion in interface Admin
      • getBackupVersionNumber

        public int getBackupVersionNumber​(java.util.List<Version> versions,
                                          int currentVersion)
        Get backup version number, the largest online version number that is less than the current version number
      • setStoreLargestUsedVersion

        public void setStoreLargestUsedVersion​(java.lang.String clusterName,
                                               java.lang.String storeName,
                                               int versionNumber)
        Update the largest used version number of a specified store.
        Specified by:
        setStoreLargestUsedVersion in interface Admin
      • setStoreOwner

        public void setStoreOwner​(java.lang.String clusterName,
                                  java.lang.String storeName,
                                  java.lang.String owner)
        Update the owner of a specified store.
        Specified by:
        setStoreOwner in interface Admin
      • setStorePartitionCount

        public void setStorePartitionCount​(java.lang.String clusterName,
                                           java.lang.String storeName,
                                           int partitionCount)
        Since partition check/calculation only happens when adding new store version, setStorePartitionCount(String, String, int) would only change the number of partition for the following pushes. Current version would not be changed.
        Specified by:
        setStorePartitionCount in interface Admin
      • setStoreWriteability

        public void setStoreWriteability​(java.lang.String clusterName,
                                         java.lang.String storeName,
                                         boolean desiredWriteability)
        Update the writability of a specified store.
        Specified by:
        setStoreWriteability in interface Admin
      • setStoreReadability

        public void setStoreReadability​(java.lang.String clusterName,
                                        java.lang.String storeName,
                                        boolean desiredReadability)
        Update the readability of a specified store.
        Specified by:
        setStoreReadability in interface Admin
      • setStoreReadWriteability

        public void setStoreReadWriteability​(java.lang.String clusterName,
                                             java.lang.String storeName,
                                             boolean isAccessible)
        Update both readability and writability of a specified store.
        Specified by:
        setStoreReadWriteability in interface Admin
      • getInUseValueSchemaIds

        public java.util.Set<java.lang.Integer> getInUseValueSchemaIds​(java.lang.String clusterName,
                                                                       java.lang.String storeName)
        Specified by:
        getInUseValueSchemaIds in interface Admin
      • deleteValueSchemas

        public void deleteValueSchemas​(java.lang.String clusterName,
                                       java.lang.String storeName,
                                       java.util.Set<java.lang.Integer> unusedValueSchemaIds)
        Description copied from interface: Admin
        Deletes a store's values schema with ids `except` the ids passed in the argument inuseValueSchemaIds
        Specified by:
        deleteValueSchemas in interface Admin
      • updateStore

        public void updateStore​(java.lang.String clusterName,
                                java.lang.String storeName,
                                UpdateStoreQueryParams params)
        TODO: some logics are in parent controller VeniceParentHelixAdmin #updateStore and some are in the child controller here. Need to unify them in the future.
        Specified by:
        updateStore in interface Admin
      • replicateUpdateStore

        public void replicateUpdateStore​(java.lang.String clusterName,
                                         java.lang.String storeName,
                                         UpdateStoreQueryParams params)
        This method is invoked in parent controllers for store migration.
      • mergeNewSettingsIntoOldHybridStoreConfig

        protected static HybridStoreConfig mergeNewSettingsIntoOldHybridStoreConfig​(Store oldStore,
                                                                                    java.util.Optional<java.lang.Long> hybridRewindSeconds,
                                                                                    java.util.Optional<java.lang.Long> hybridOffsetLagThreshold,
                                                                                    java.util.Optional<java.lang.Long> hybridTimeLagThreshold,
                                                                                    java.util.Optional<DataReplicationPolicy> hybridDataReplicationPolicy,
                                                                                    java.util.Optional<BufferReplayPolicy> bufferReplayPolicy)
        Used by both the VeniceHelixAdmin and the VeniceParentHelixAdmin
        Parameters:
        oldStore - Existing Store that is the source for updates. This object will not be modified by this method.
        hybridRewindSeconds - Optional is present if the returned object should include a new rewind time
        hybridOffsetLagThreshold - Optional is present if the returned object should include a new offset lag threshold
        Returns:
        null if oldStore has no hybrid configs and optionals are not present, otherwise a fully specified HybridStoreConfig
      • storeMetadataUpdate

        public void storeMetadataUpdate​(java.lang.String clusterName,
                                        java.lang.String storeName,
                                        VeniceHelixAdmin.StoreMetadataOperation operation)
        Update the store metadata by applying provided operation.
        Parameters:
        clusterName - name of the cluster.
        storeName - name of the to be updated store.
        operation - the defined operation that update the store.
      • containsHelixResource

        public boolean containsHelixResource​(java.lang.String clusterName,
                                             java.lang.String kafkaTopic)
        Description copied from interface: StoreCleaner
        This purpose of the function is to check if the given resource exists in the Helix cluster.
        Specified by:
        containsHelixResource in interface StoreCleaner
        Parameters:
        clusterName - The Venice cluster that the resource belongs to.
        kafkaTopic - it's usually the store version name (version topic name).
        Returns:
      • deleteHelixResource

        public void deleteHelixResource​(java.lang.String clusterName,
                                        java.lang.String kafkaTopic)
        Description copied from interface: StoreCleaner
        This purpose of the function is to delete the given resource from the Helix cluster. Different from StoreCleaner.deleteOneStoreVersion(String, String, int), this function will not check whether the store version is still a valid version inside Venice backend, and it will send the delete request to Helix cluster directly. Do enough sanity check before calling this function.
        Specified by:
        deleteHelixResource in interface StoreCleaner
        Parameters:
        clusterName - The Venice cluster that the resource belongs to.
        kafkaTopic - It's usually the store version name (version topic name).
      • enableDisabledPartition

        public void enableDisabledPartition​(java.lang.String clusterName,
                                            java.lang.String kafkaTopic,
                                            boolean enableAll)
      • getKeySchema

        public SchemaEntry getKeySchema​(java.lang.String clusterName,
                                        java.lang.String storeName)
        Specified by:
        getKeySchema in interface Admin
        Returns:
        the key schema for the specified store.
      • getValueSchemas

        public java.util.Collection<SchemaEntry> getValueSchemas​(java.lang.String clusterName,
                                                                 java.lang.String storeName)
        Specified by:
        getValueSchemas in interface Admin
        Returns:
        the value schema for the specified store.
      • getDerivedSchemas

        public java.util.Collection<DerivedSchemaEntry> getDerivedSchemas​(java.lang.String clusterName,
                                                                          java.lang.String storeName)
        Specified by:
        getDerivedSchemas in interface Admin
        Returns:
        the derived schema for the specified store.
      • getValueSchemaId

        public int getValueSchemaId​(java.lang.String clusterName,
                                    java.lang.String storeName,
                                    java.lang.String valueSchemaStr)
        Specified by:
        getValueSchemaId in interface Admin
        Returns:
        the schema id for the specified store and value schema.
      • getDerivedSchemaId

        public GeneratedSchemaID getDerivedSchemaId​(java.lang.String clusterName,
                                                    java.lang.String storeName,
                                                    java.lang.String schemaStr)
        Specified by:
        getDerivedSchemaId in interface Admin
        Returns:
        the derived schema id for the specified store and derived schema.
      • getValueSchema

        public SchemaEntry getValueSchema​(java.lang.String clusterName,
                                          java.lang.String storeName,
                                          int id)
        Specified by:
        getValueSchema in interface Admin
        Returns:
        the derived schema for the specified store and id.
      • addValueSchema

        public SchemaEntry addValueSchema​(java.lang.String clusterName,
                                          java.lang.String storeName,
                                          java.lang.String valueSchemaStr,
                                          int schemaId,
                                          DirectionalSchemaCompatibilityType compatibilityType)
        Add a new value schema for the given store with all specified properties and return a new SchemaEntry object containing the schema and its id.
        Specified by:
        addValueSchema in interface Admin
        Returns:
        an SchemaEntry object composed of a schema and its corresponding id.
      • addDerivedSchema

        public DerivedSchemaEntry addDerivedSchema​(java.lang.String clusterName,
                                                   java.lang.String storeName,
                                                   int valueSchemaId,
                                                   java.lang.String derivedSchemaStr)
        Add a new derived schema for the given store with all specified properties and return a new DerivedSchemaEntry object containing the schema and its id.
        Specified by:
        addDerivedSchema in interface Admin
        Returns:
        an DerivedSchemaEntry object composed of specified properties.
      • addDerivedSchema

        public DerivedSchemaEntry addDerivedSchema​(java.lang.String clusterName,
                                                   java.lang.String storeName,
                                                   int valueSchemaId,
                                                   int derivedSchemaId,
                                                   java.lang.String derivedSchemaStr)
        Add a new derived schema for the given store with all specified properties.
        Specified by:
        addDerivedSchema in interface Admin
        Returns:
        an DerivedSchemaEntry object composed of specified properties.
      • addSupersetSchema

        public SchemaEntry addSupersetSchema​(java.lang.String clusterName,
                                             java.lang.String storeName,
                                             java.lang.String valueSchema,
                                             int valueSchemaId,
                                             java.lang.String supersetSchemaStr,
                                             int supersetSchemaId)
        Add a new superset schema for the given store with all specified properties.

        Generate the superset schema off the current schema and latest superset schema (if any, if not pick the latest value schema) existing in the store. If the newly generated superset schema is unique add it to the store and update latestSuperSetValueSchemaId of the store.

        Specified by:
        addSupersetSchema in interface Admin
      • getReplicationMetadataSchemas

        public java.util.Collection<RmdSchemaEntry> getReplicationMetadataSchemas​(java.lang.String clusterName,
                                                                                  java.lang.String storeName)
        Specified by:
        getReplicationMetadataSchemas in interface Admin
        Returns:
        a collection of ReplicationMetadataSchemaEntry object for the given store and cluster.
      • addReplicationMetadataSchema

        public RmdSchemaEntry addReplicationMetadataSchema​(java.lang.String clusterName,
                                                           java.lang.String storeName,
                                                           int valueSchemaId,
                                                           int replicationMetadataVersionId,
                                                           java.lang.String replicationMetadataSchemaStr)
        Create a new ReplicationMetadataSchemaEntry object with the given properties and add it into schema repository if no duplication.
        Specified by:
        addReplicationMetadataSchema in interface Admin
        Returns:
        ReplicationMetadataSchemaEntry object reference.
      • validateAndMaybeRetrySystemStoreAutoCreation

        public void validateAndMaybeRetrySystemStoreAutoCreation​(java.lang.String clusterName,
                                                                 java.lang.String storeName,
                                                                 VeniceSystemStoreType systemStoreType)
        Check the creation results of a user store's system store. If the system store's current version is in error state, re-issue a new empty push and waits for the empty push to complete.
        Specified by:
        validateAndMaybeRetrySystemStoreAutoCreation in interface Admin
      • getStorageNodes

        public java.util.List<java.lang.String> getStorageNodes​(java.lang.String clusterName)
        Specified by:
        getStorageNodes in interface Admin
        Returns:
        a list of storage node instance names for a given cluster.
      • getDisabledPartitionStats

        public DisabledPartitionStats getDisabledPartitionStats​(java.lang.String clusterName)
      • getStorageNodesStatus

        public java.util.Map<java.lang.String,​java.lang.String> getStorageNodesStatus​(java.lang.String clusterName,
                                                                                            boolean enableReplica)
        Specified by:
        getStorageNodesStatus in interface Admin
        Returns:
        a map containing the storage node name and its connectivity status (InstanceStatus).
      • removeStorageNode

        public void removeStorageNode​(java.lang.String clusterName,
                                      java.lang.String instanceId)
        Remove one storage node from the given cluster.

        It removes the given helix nodeId from the allowlist in ZK and its associated resource in Helix.

        Specified by:
        removeStorageNode in interface Admin
      • stop

        public void stop​(java.lang.String clusterName)
        Description copied from interface: Admin
        Stop the helix controller for a single cluster.
        Specified by:
        stop in interface Admin
        See Also:
        Admin.stop(String)
      • getOffLinePushStatus

        public Admin.OfflinePushStatusInfo getOffLinePushStatus​(java.lang.String clusterName,
                                                                java.lang.String kafkaTopic)
        Description copied from interface: Admin
        Query the status of the offline push by given kafka topic. TODO We use kafka topic to tracking the status now but in the further we should use jobId instead of kafka TODO topic. Right now each kafka topic only have one offline job. But in the further one kafka topic could be TODO assigned multiple jobs like data migration job etc.
        Specified by:
        getOffLinePushStatus in interface Admin
        Returns:
        the status of current offline push for the passed kafka topic
        See Also:
        Admin.getOffLinePushStatus(String, String)
      • getOffLinePushStatus

        public Admin.OfflinePushStatusInfo getOffLinePushStatus​(java.lang.String clusterName,
                                                                java.lang.String kafkaTopic,
                                                                java.util.Optional<java.lang.String> incrementalPushVersion,
                                                                java.lang.String region,
                                                                java.lang.String targetedRegions)
        Specified by:
        getOffLinePushStatus in interface Admin
      • getKafkaBootstrapServers

        public java.lang.String getKafkaBootstrapServers​(boolean isSSL)
        Description copied from interface: Admin
        Return the ssl or non-ssl bootstrap servers based on the given flag.
        Specified by:
        getKafkaBootstrapServers in interface Admin
        Returns:
        kafka bootstrap servers url, if there are multiple will be comma separated.
        See Also:
        Admin.getKafkaBootstrapServers(boolean)
      • getRegionName

        public java.lang.String getRegionName()
        Description copied from interface: Admin
        Return the region name of this Admin
        Specified by:
        getRegionName in interface Admin
        Returns:
        the region name of this controller
      • getNativeReplicationSourceFabric

        public java.lang.String getNativeReplicationSourceFabric​(java.lang.String clusterName,
                                                                 Store store,
                                                                 java.util.Optional<java.lang.String> sourceGridFabric,
                                                                 java.util.Optional<java.lang.String> emergencySourceRegion,
                                                                 java.lang.String targetedRegions)
        Source fabric selection priority: 1. Parent controller emergency source fabric config. 2. VPJ plugin targeted region config, however it will compute all selections based on the criteria below to select the source region. 3. VPJ plugin source grid fabric config. 4. Store level source fabric config. 5. Cluster level source fabric config.
        Specified by:
        getNativeReplicationSourceFabric in interface Admin
        Returns:
        the selected source fabric for a given store.
      • isLeaderControllerFor

        public boolean isLeaderControllerFor​(java.lang.String clusterName)
        Description copied from interface: Admin
        Check if this controller itself is the leader controller for a given cluster or not. Note that the controller can be either a parent controller or a child controller since a cluster must have a leader child controller and a leader parent controller. The point is not to be confused the concept of leader-standby with parent-child controller architecture.
        Specified by:
        isLeaderControllerFor in interface Admin
        See Also:
        Admin.isLeaderControllerFor(String)
      • calculateNumberOfPartitions

        public int calculateNumberOfPartitions​(java.lang.String clusterName,
                                               java.lang.String storeName)
        Calculate number of partition for given store.
        Specified by:
        calculateNumberOfPartitions in interface Admin
      • getReplicationFactor

        public int getReplicationFactor​(java.lang.String clusterName,
                                        java.lang.String storeName)
        Specified by:
        getReplicationFactor in interface Admin
        Returns:
        the replication factor of the given store.
      • getReplicas

        public java.util.List<Replica> getReplicas​(java.lang.String clusterName,
                                                   java.lang.String kafkaTopic)
        Specified by:
        getReplicas in interface Admin
        Returns:
        a list of Replica created for the given resource.
      • isInstanceRemovable

        public NodeRemovableResult isInstanceRemovable​(java.lang.String clusterName,
                                                       java.lang.String helixNodeId,
                                                       java.util.List<java.lang.String> lockedNodes,
                                                       boolean isFromInstanceView)
        Description copied from interface: Admin
        Assuming all hosts identified by lockedNodes and their corresponding resources are unusable, is the given instance able to be removed out from the given cluster. For example, if there is only one online replica alive in this cluster which is hosted on the given instance. This instance should not be removed out of cluster, otherwise Venice will lose data. For detail criteria please refer to InstanceStatusDecider
        Specified by:
        isInstanceRemovable in interface Admin
        helixNodeId - nodeId of helix participant. HOST_PORT.
        lockedNodes - A list of helix nodeIds whose resources are assumed to be unusable (stopped).
        isFromInstanceView - If the value is true, it means we will only check the partitions this instance hold. E.g. if all replicas of a partition are error, but this instance does not hold any replica in this partition, we will skip this partition in the checking. If the value is false, we will check all partitions of resources this instance hold.
        See Also:
        Admin.isInstanceRemovable(String, String, List, boolean)
      • addInstanceToAllowlist

        public void addInstanceToAllowlist​(java.lang.String clusterName,
                                           java.lang.String helixNodeId)
        Add the given helix nodeId into the allowlist in ZK.
        Specified by:
        addInstanceToAllowlist in interface Admin
      • removeInstanceFromAllowList

        public void removeInstanceFromAllowList​(java.lang.String clusterName,
                                                java.lang.String helixNodeId)
        Remove the given helix nodeId from the allowlist in ZK.
        Specified by:
        removeInstanceFromAllowList in interface Admin
      • getAllowlist

        public java.util.Set<java.lang.String> getAllowlist​(java.lang.String clusterName)
        Specified by:
        getAllowlist in interface Admin
        Returns:
        a list of all helix nodeIds in the allowlist for the given cluster from ZK.
      • killOfflinePush

        public void killOfflinePush​(java.lang.String clusterName,
                                    java.lang.String kafkaTopic,
                                    boolean isForcedKill)
        Description copied from interface: Admin
        Kill an offline push if it ran into errors or the corresponding version is being retired.
        Specified by:
        killOfflinePush in interface Admin
        isForcedKill - should be set to true when killing the push job for retiring the corresponding version.
        See Also:
        Admin.killOfflinePush(String, String, boolean)
      • deleteParticipantStoreKillMessage

        public void deleteParticipantStoreKillMessage​(java.lang.String clusterName,
                                                      java.lang.String kafkaTopic)
        Compose a ParticipantMessageKey message and execute a delete operation on the key to the cluster's participant store.
      • sendKillMessageToParticipantStore

        public void sendKillMessageToParticipantStore​(java.lang.String clusterName,
                                                      java.lang.String kafkaTopic)
      • getStorageNodesStatus

        public StorageNodeStatus getStorageNodesStatus​(java.lang.String clusterName,
                                                       java.lang.String instanceId)
        Description copied from interface: Admin
        Query and return the current status of the given storage node. The "storage node status" is composed by "status" of all replicas in that storage node. "status" is an integer value of Helix state:
        • DROPPED=1
        • ERROR=2
        • OFFLINE=3
        • BOOTSTRAP=4
        • ONLINE=5
        So this method will return a map, the key is the replica name which is composed by resource name and partitionId, and the value is the "status" of this replica.
        Specified by:
        getStorageNodesStatus in interface Admin
        See Also:
        Admin.getStorageNodesStatus(String, boolean)
      • isStorageNodeNewerOrEqualTo

        public boolean isStorageNodeNewerOrEqualTo​(java.lang.String clusterName,
                                                   java.lang.String instanceId,
                                                   StorageNodeStatus oldStatus)
        Description copied from interface: Admin
        Compare the current storage node status and the given storage node status to check is the current one is "Newer" or "Equal" to the given one. Compare will go through each of replica in this storage node, if all their statuses values were larger or equal than the statuses value in the given storage node status, We say current storage node status is "Newer" or "Equal " to the given one.
        Specified by:
        isStorageNodeNewerOrEqualTo in interface Admin
        See Also:
        Admin.isStorageNodeNewerOrEqualTo(String, String, StorageNodeStatus)
      • skipAdminMessage

        public void skipAdminMessage​(java.lang.String clusterName,
                                     long offset,
                                     boolean skipDIV)
        Description copied from interface: Admin
        The admin consumption task tries to deal with failures to process an admin message by retrying. If there is a message that cannot be processed for some reason, we will need to forcibly skip that message in order to unblock the task from consuming subsequent messages.
        Specified by:
        skipAdminMessage in interface Admin
        skipDIV - tries to skip only the DIV check for the blocking message.
        See Also:
        Admin.skipAdminMessage(String, long, boolean)
      • getLastSucceededExecutionId

        public java.lang.Long getLastSucceededExecutionId​(java.lang.String clusterName,
                                                          java.lang.String storeName)
        Get last succeeded execution id for a given store; if storeName is null, return the last succeeded execution id for the cluster
        Parameters:
        clusterName -
        storeName -
        Returns:
        the last succeeded execution id or null if the cluster/store is invalid or the admin consumer service for the given cluster is not up and running yet.
      • getAdminTopicMetadata

        public java.util.Map<java.lang.String,​java.lang.Long> getAdminTopicMetadata​(java.lang.String clusterName,
                                                                                          java.util.Optional<java.lang.String> storeName)
        Specified by:
        getAdminTopicMetadata in interface Admin
        Returns:
        cluster-level execution id, offset and upstream offset. If store name is specified, it returns store-level execution id.
      • updateAdminTopicMetadata

        public void updateAdminTopicMetadata​(java.lang.String clusterName,
                                             long executionId,
                                             java.util.Optional<java.lang.String> storeName,
                                             java.util.Optional<java.lang.Long> offset,
                                             java.util.Optional<java.lang.Long> upstreamOffset)
        Update cluster-level execution id, offset and upstream offset. If store name is specified, it updates the store-level execution id.
        Specified by:
        updateAdminTopicMetadata in interface Admin
      • updateRoutersClusterConfig

        public void updateRoutersClusterConfig​(java.lang.String clusterName,
                                               java.util.Optional<java.lang.Boolean> isThrottlingEnable,
                                               java.util.Optional<java.lang.Boolean> isQuotaRebalancedEnable,
                                               java.util.Optional<java.lang.Boolean> isMaxCapacityProtectionEnabled,
                                               java.util.Optional<java.lang.Integer> expectedRouterCount)
        Description copied from interface: Admin
        Update the cluster level for all routers.
        Specified by:
        updateRoutersClusterConfig in interface Admin
        See Also:
        Admin.updateRoutersClusterConfig(String, Optional, Optional, Optional, Optional)
      • getAllStorePushStrategyForMigration

        public java.util.Map<java.lang.String,​java.lang.String> getAllStorePushStrategyForMigration()
        Unsupported operation in the child controller.
        Specified by:
        getAllStorePushStrategyForMigration in interface Admin
      • setStorePushStrategyForMigration

        public void setStorePushStrategyForMigration​(java.lang.String voldemortStoreName,
                                                     java.lang.String strategy)
        Unsupported operation in the child controller.
        Specified by:
        setStorePushStrategyForMigration in interface Admin
      • discoverCluster

        public Pair<java.lang.String,​java.lang.String> discoverCluster​(java.lang.String storeName)
        Description copied from interface: Admin
        Find the cluster which the given store belongs to. Return the pair of the cluster name and the d2 service associated with that cluster.
        Specified by:
        discoverCluster in interface Admin
        See Also:
        Admin.discoverCluster(String)
      • getServerD2Service

        public java.lang.String getServerD2Service​(java.lang.String clusterName)
        Description copied from interface: Admin
        Find the server d2 service associated with a given cluster name.
        Specified by:
        getServerD2Service in interface Admin
        See Also:
        Admin.getServerD2Service(String)
      • getVeniceWriterFactory

        public VeniceWriterFactory getVeniceWriterFactory()
        Specified by:
        getVeniceWriterFactory in interface Admin
        Returns:
        a VeniceWriterFactory object used by the Venice controller to create the venice writer.
      • stopMonitorOfflinePush

        public void stopMonitorOfflinePush​(java.lang.String clusterName,
                                           java.lang.String topic,
                                           boolean deletePushStatus,
                                           boolean isForcedDelete)
      • close

        public void close()
        Cause VeniceHelixAdmin and its associated services to stop executing.
        Specified by:
        close in interface Admin
        Specified by:
        close in interface java.lang.AutoCloseable
        Specified by:
        close in interface java.io.Closeable
      • isLeaderControllerOfControllerCluster

        public boolean isLeaderControllerOfControllerCluster()
        This function is used to detect whether current node is the leader controller of controller cluster. Be careful to use this function since it will talk to Zookeeper directly every time.
        Specified by:
        isLeaderControllerOfControllerCluster in interface Admin
        Returns:
      • setStoreConfigForMigration

        public void setStoreConfigForMigration​(java.lang.String storeName,
                                               java.lang.String srcClusterName,
                                               java.lang.String destClusterName)
        Update "migrationDestCluster" and "migrationSrcCluster" fields of the "/storeConfigs/storeName" znode.
        Parameters:
        storeName - name of the store.
        srcClusterName - name of the source cluster.
        destClusterName - name of the destination cluster.
      • updateAclForStore

        public void updateAclForStore​(java.lang.String clusterName,
                                      java.lang.String storeName,
                                      java.lang.String accessPermissions)
        Description copied from interface: Admin
        Provision a new set of ACL for a venice store and its associated kafka topic.
        Specified by:
        updateAclForStore in interface Admin
        See Also:
        Admin.updateAclForStore(String, String, String)
      • getAclForStore

        public java.lang.String getAclForStore​(java.lang.String clusterName,
                                               java.lang.String storeName)
        Description copied from interface: Admin
        Fetch the current set of ACL provisioned for a venice store and its associated kafka topic.
        Specified by:
        getAclForStore in interface Admin
        Returns:
        The string representation of the accessPermissions. It will return empty string in case store is not present.
        See Also:
        Admin.getAclForStore(String, String)
      • deleteAclForStore

        public void deleteAclForStore​(java.lang.String clusterName,
                                      java.lang.String storeName)
        Description copied from interface: Admin
        Delete the current set of ACL provisioned for a venice store and its associated kafka topic.
        Specified by:
        deleteAclForStore in interface Admin
        See Also:
        Admin.deleteAclForStore(String, String)
      • getClusterStores

        public java.util.ArrayList<StoreInfo> getClusterStores​(java.lang.String clusterName)
        Description copied from interface: Admin
        Return all stores in a cluster.
        Specified by:
        getClusterStores in interface Admin
        Returns:
        a list of StoreInfo of all stores in the specified cluster.
      • listStorePushInfo

        public java.util.Map<java.lang.String,​RegionPushDetails> listStorePushInfo​(java.lang.String clusterName,
                                                                                         java.lang.String storeName,
                                                                                         boolean isPartitionDetailEnabled)
        Specified by:
        listStorePushInfo in interface Admin
      • getRegionPushDetails

        public RegionPushDetails getRegionPushDetails​(java.lang.String clusterName,
                                                      java.lang.String storeName,
                                                      boolean isPartitionDetailAdded)
        Specified by:
        getRegionPushDetails in interface Admin
        Returns:
        RegionPushDetails object containing the specified store's push status.
      • wipeCluster

        public void wipeCluster​(java.lang.String clusterName,
                                java.lang.String fabric,
                                java.util.Optional<java.lang.String> storeName,
                                java.util.Optional<java.lang.Integer> versionNum)
        Delete stores from the cluster including both store data and metadata.

        The API provides the flexibility to delete a single store or a single version. Cluster name and fabric are required parameters, but store name and version number are optional. If store name is empty, all stores in the cluster are deleted.

        Specified by:
        wipeCluster in interface Admin
        Parameters:
        clusterName - name of the Venice cluster.
        fabric - name of the fabric.
        storeName - name of the to be deleted store, if value is absent, all stores in the cluster are deleted.
        versionNum - the number of the version to be deleted, if present, only the specified version is deleted.
      • isParent

        public boolean isParent()
        Description copied from interface: Admin
        Check whether the controller works as a parent controller
        Specified by:
        isParent in interface Admin
        Returns:
        true if it works as a parent controller. Otherwise, return false.
        See Also:
        Admin.isParent()
      • getChildDataCenterControllerD2Map

        public java.util.Map<java.lang.String,​java.lang.String> getChildDataCenterControllerD2Map​(java.lang.String clusterName)
        Description copied from interface: Admin
        Get child datacenter to child controller d2 zk host mapping
        Specified by:
        getChildDataCenterControllerD2Map in interface Admin
        Returns:
        A map of child datacenter -> child controller d2 zk host
        See Also:
        Admin.getChildDataCenterControllerD2Map(String)
      • getClustersLeaderOf

        public java.util.List<java.lang.String> getClustersLeaderOf()
        Description copied from interface: Admin
        Get a list of clusters this controller is a leader of.
        Specified by:
        getClustersLeaderOf in interface Admin
        Returns:
        a list of clusters this controller is a leader of.
        See Also:
        Admin.getClustersLeaderOf()
      • getDefaultMaxRecordSizeBytes

        public int getDefaultMaxRecordSizeBytes()
        Specified by:
        getDefaultMaxRecordSizeBytes in interface Admin
        Returns:
        The default value of VeniceWriter.maxRecordSizeBytes which is provided to the VPJ and Consumer as a controller config to dynamically control the setting per cluster.
        See Also:
        Admin.getDefaultMaxRecordSizeBytes()
      • initiateDataRecovery

        public void initiateDataRecovery​(java.lang.String clusterName,
                                         java.lang.String storeName,
                                         int version,
                                         java.lang.String sourceFabric,
                                         java.lang.String destinationFabric,
                                         boolean copyAllVersionConfigs,
                                         java.util.Optional<Version> sourceFabricVersion)
        Description copied from interface: Admin
        Initiate data recovery for a store version given a source fabric.
        Specified by:
        initiateDataRecovery in interface Admin
        Parameters:
        clusterName - of the store.
        storeName - of the store.
        version - of the store.
        sourceFabric - to be used as the source for data recovery.
        copyAllVersionConfigs - a boolean to indicate whether all version configs should be copied from the source fabric or only the essential version configs and generate the rest based on destination fabric's Store configs.
        sourceFabricVersion - source fabric's Version configs used to configure the recovering version in the destination fabric.
        See Also:
        Admin.initiateDataRecovery(String, String, int, String, String, boolean, Optional)
      • prepareDataRecovery

        public void prepareDataRecovery​(java.lang.String clusterName,
                                        java.lang.String storeName,
                                        int version,
                                        java.lang.String sourceFabric,
                                        java.lang.String destinationFabric,
                                        java.util.Optional<java.lang.Integer> sourceAmplificationFactor)
        Description copied from interface: Admin
        Prepare for data recovery in the destination fabric. The interested store version might have lingering states and resources in the destination fabric from previous failed attempts. Perform some basic checks to make sure the store version in the destination fabric is capable of performing data recovery and cleanup any lingering states and resources.
        Specified by:
        prepareDataRecovery in interface Admin
        See Also:
        Admin.prepareDataRecovery(String, String, int, String, String, Optional)
      • isStoreVersionReadyForDataRecovery

        public Pair<java.lang.Boolean,​java.lang.String> isStoreVersionReadyForDataRecovery​(java.lang.String clusterName,
                                                                                                 java.lang.String storeName,
                                                                                                 int version,
                                                                                                 java.lang.String sourceFabric,
                                                                                                 java.lang.String destinationFabric,
                                                                                                 java.util.Optional<java.lang.Integer> sourceAmplificationFactor)
        Description copied from interface: Admin
        Check if the store version's previous states and resources are cleaned up and ready to start data recovery.
        Specified by:
        isStoreVersionReadyForDataRecovery in interface Admin
        Returns:
        whether is ready to start data recovery and the reason if it's not ready.
        See Also:
        Admin.isStoreVersionReadyForDataRecovery(String, String, int, String, String, Optional)
      • getLargestUsedVersionFromStoreGraveyard

        public int getLargestUsedVersionFromStoreGraveyard​(java.lang.String clusterName,
                                                           java.lang.String storeName)
        Specified by:
        getLargestUsedVersionFromStoreGraveyard in interface Admin
        Returns:
        the largest used version number for the given store from store graveyard.
      • cleanupInstanceCustomizedStates

        public java.util.List<java.lang.String> cleanupInstanceCustomizedStates​(java.lang.String clusterName)
        Description copied from interface: Admin
        Scan through instance level customized states and remove any lingering ZNodes that are no longer relevant. This operation shouldn't be needed under normal circumstances. It's intended to cleanup ZNodes that failed to be deleted due to bugs and errors.
        Specified by:
        cleanupInstanceCustomizedStates in interface Admin
        Parameters:
        clusterName - to perform the cleanup.
        Returns:
        list of deleted ZNode paths.
      • removeStoreFromGraveyard

        public void removeStoreFromGraveyard​(java.lang.String clusterName,
                                             java.lang.String storeName)
        Specified by:
        removeStoreFromGraveyard in interface Admin
      • sendHeartbeatToSystemStore

        public void sendHeartbeatToSystemStore​(java.lang.String clusterName,
                                               java.lang.String storeName,
                                               long heartbeatTimeStamp)
        Description copied from interface: Admin
        Send a heartbeat timestamp to targeted system store.
        Specified by:
        sendHeartbeatToSystemStore in interface Admin
      • getHeartbeatFromSystemStore

        public long getHeartbeatFromSystemStore​(java.lang.String clusterName,
                                                java.lang.String systemStoreName)
        Description copied from interface: Admin
        Read the latest heartbeat timestamp from system store. If it failed to read from system store, this method should return -1.
        Specified by:
        getHeartbeatFromSystemStore in interface Admin
      • getSslFactory

        public java.util.Optional<SSLFactory> getSslFactory()
      • isClusterWipeAllowed

        public boolean isClusterWipeAllowed​(java.lang.String clusterName)