Class TopicManager

  • All Implemented Interfaces:
    java.io.Closeable, java.lang.AutoCloseable

    public class TopicManager
    extends java.lang.Object
    implements java.io.Closeable
    Topic manager is responsible for creating, deleting, and updating topics. It also provides APIs to query topic metadata. It is essentially a wrapper over PubSubAdminAdapter and PubSubConsumerAdapter with additional features such as caching, metrics, and retry. TODO: We still have retries in the PubSubAdminAdapter, we will eventually move them here.
    • Constructor Detail

      • TopicManager

        public TopicManager​(java.lang.String pubSubClusterAddress,
                            TopicManagerContext context)
    • Method Detail

      • createTopic

        public void createTopic​(PubSubTopic topicName,
                                int numPartitions,
                                int replication,
                                boolean eternal)
        Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
      • createTopic

        public void createTopic​(PubSubTopic topicName,
                                int numPartitions,
                                int replication,
                                boolean eternal,
                                boolean logCompaction,
                                java.util.Optional<java.lang.Integer> minIsr)
      • createTopic

        public void createTopic​(PubSubTopic topicName,
                                int numPartitions,
                                int replication,
                                boolean eternal,
                                boolean logCompaction,
                                java.util.Optional<java.lang.Integer> minIsr,
                                boolean useFastPubSubOperationTimeout)
        Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
        Parameters:
        topicName - Name for the new topic
        numPartitions - number of partitions
        replication - replication factor
        eternal - if true, the topic will have "infinite" (~250 mil years) retention if false, its retention will be set to PubSubConstants.DEFAULT_TOPIC_RETENTION_POLICY_MS by default
        logCompaction - whether to enable log compaction on the topic
        minIsr - if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be used
        useFastPubSubOperationTimeout - if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
      • createTopic

        public void createTopic​(PubSubTopic topicName,
                                int numPartitions,
                                int replication,
                                long retentionTimeMs,
                                boolean logCompaction,
                                java.util.Optional<java.lang.Integer> minIsr,
                                boolean useFastPubSubOperationTimeout)
        Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
        Parameters:
        topicName - Name for the new topic
        numPartitions - number of partitions
        replication - replication factor
        retentionTimeMs - Retention time, in ms, for the topic
        logCompaction - whether to enable log compaction on the topic
        minIsr - if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be used
        useFastPubSubOperationTimeout - if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
      • waitUntilTopicCreated

        protected void waitUntilTopicCreated​(PubSubTopic topicName,
                                             int partitionCount,
                                             long deadlineMs)
      • updateTopicCompactionPolicy

        public void updateTopicCompactionPolicy​(PubSubTopic topic,
                                                boolean expectedLogCompacted)
      • updateTopicCompactionPolicy

        public void updateTopicCompactionPolicy​(PubSubTopic topic,
                                                boolean expectedLogCompacted,
                                                long minLogCompactionLagMs,
                                                java.util.Optional<java.lang.Long> maxLogCompactionLagMs)
                                         throws PubSubTopicDoesNotExistException
        Update topic compaction policy.
        Parameters:
        topic -
        expectedLogCompacted -
        minLogCompactionLagMs - the overrode min log compaction lag. If this is specified and a valid number (> 0), it will override the default config
        Throws:
        PubSubTopicDoesNotExistException
      • isTopicCompactionEnabled

        public boolean isTopicCompactionEnabled​(PubSubTopic topicName)
      • getTopicMinLogCompactionLagMs

        public long getTopicMinLogCompactionLagMs​(PubSubTopic topicName)
      • getTopicMaxLogCompactionLagMs

        public java.util.Optional<java.lang.Long> getTopicMaxLogCompactionLagMs​(PubSubTopic topicName)
      • getAllTopicRetentions

        public java.util.Map<PubSubTopic,​java.lang.Long> getAllTopicRetentions()
        Get retention time for all topics in the pubsub cluster.
        Returns:
        a map of topic name to retention time in MS.
      • isTopicTruncated

        public boolean isTopicTruncated​(PubSubTopic topicName,
                                        long truncatedTopicMaxRetentionMs)
        Check whether topic is absent or truncated
        Parameters:
        topicName -
        truncatedTopicMaxRetentionMs -
        Returns:
        true if the topic does not exist or if it exists but its retention time is below truncated threshold false if the topic exists and its retention time is above truncated threshold
      • isRetentionBelowTruncatedThreshold

        public boolean isRetentionBelowTruncatedThreshold​(long retention,
                                                          long truncatedTopicMaxRetentionMs)
      • getTopicConfig

        public PubSubTopicConfiguration getTopicConfig​(PubSubTopic pubSubTopic)
        This operation is a little heavy, since it will pull the configs for all the topics.
      • ensureTopicIsDeletedAndBlock

        public void ensureTopicIsDeletedAndBlock​(PubSubTopic pubSubTopic)
        Delete a topic and block until it is deleted or operation times out.
        Parameters:
        pubSubTopic - topic to delete
      • ensureTopicIsDeletedAndBlockWithRetry

        public void ensureTopicIsDeletedAndBlockWithRetry​(PubSubTopic pubSubTopic)
        Delete a topic with retry and block until it is deleted or operation times out.
        Parameters:
        pubSubTopic -
      • listTopics

        public java.util.Set<PubSubTopic> listTopics()
      • containsTopicWithExpectationAndRetry

        public boolean containsTopicWithExpectationAndRetry​(PubSubTopic topic,
                                                            int maxAttempts,
                                                            boolean expectedResult,
                                                            java.time.Duration initialBackoff,
                                                            java.time.Duration maxBackoff,
                                                            java.time.Duration maxDuration)
      • containsTopicAndAllPartitionsAreOnline

        public boolean containsTopicAndAllPartitionsAreOnline​(PubSubTopic topic)
      • containsTopicAndAllPartitionsAreOnline

        public boolean containsTopicAndAllPartitionsAreOnline​(PubSubTopic topic,
                                                              java.lang.Integer expectedPartitionCount)
        This is an extensive check to mitigate an edge-case where a topic is not yet created in all the brokers.
        Returns:
        true if the topic exists and all its partitions have at least one in-sync replica false if the topic does not exist at all or if it exists but isn't completely available
      • getTopicPartitionInfo

        public java.util.List<PubSubTopicPartitionInfo> getTopicPartitionInfo​(PubSubTopic pubSubTopic)
        Get information about all partitions for a given topic.
        Parameters:
        pubSubTopic - the topic to get partition info for
        Returns:
        a list of PubSubTopicPartitionInfo for the given topic
      • getTopicLatestOffsets

        public it.unimi.dsi.fastutil.ints.Int2LongMap getTopicLatestOffsets​(PubSubTopic pubSubTopic)
        Get the latest offsets for all partitions of a given topic.
        Parameters:
        pubSubTopic - the topic to get latest offsets for
        Returns:
        a Map of partition to the latest offset, or an empty map if there's any problem getting the offsets
      • getPartitionCount

        public int getPartitionCount​(PubSubTopic pubSubTopic)
        Get partition count for a given topic.
        Parameters:
        pubSubTopic - the topic to get partition count for
        Returns:
        the number of partitions for the given topic
        Throws:
        PubSubTopicDoesNotExistException - if the topic does not exist
      • containsTopic

        public boolean containsTopic​(PubSubTopic pubSubTopic)
      • containsTopicCached

        public boolean containsTopicCached​(PubSubTopic pubSubTopic)
      • containsTopicWithRetries

        public boolean containsTopicWithRetries​(PubSubTopic pubSubTopic,
                                                int retries)
      • getLatestOffsetWithRetries

        public long getLatestOffsetWithRetries​(PubSubTopicPartition pubSubTopicPartition,
                                               int retries)
      • getLatestOffsetCached

        public long getLatestOffsetCached​(PubSubTopic pubSubTopic,
                                          int partitionId)
      • getLatestOffsetCachedNonBlocking

        public long getLatestOffsetCachedNonBlocking​(PubSubTopic pubSubTopic,
                                                     int partitionId)
      • getProducerTimestampOfLastDataMessageWithRetries

        public long getProducerTimestampOfLastDataMessageWithRetries​(PubSubTopicPartition pubSubTopicPartition,
                                                                     int retries)
      • getProducerTimestampOfLastDataMessageCached

        public long getProducerTimestampOfLastDataMessageCached​(PubSubTopicPartition pubSubTopicPartition)
      • getOffsetByTime

        public long getOffsetByTime​(PubSubTopicPartition pubSubTopicPartition,
                                    long timestamp)
        Get offsets for only one partition with a specific timestamp.
      • invalidateCache

        public java.util.concurrent.CompletableFuture<java.lang.Void> invalidateCache​(PubSubTopic pubSubTopic)
        Invalidate the cache for the given topic and all its partitions.
        Parameters:
        pubSubTopic - the topic to invalidate
      • prefetchAndCacheLatestOffset

        public void prefetchAndCacheLatestOffset​(PubSubTopicPartition pubSubTopicPartition)
        Prefetch and cache the latest offset for the given topic-partition.
        Parameters:
        pubSubTopicPartition - the topic-partition to prefetch and cache the latest offset for
      • getPubSubClusterAddress

        public java.lang.String getPubSubClusterAddress()
      • close

        public void close()
        Specified by:
        close in interface java.lang.AutoCloseable
        Specified by:
        close in interface java.io.Closeable
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object