Class TopicManager

java.lang.Object
com.linkedin.venice.pubsub.manager.TopicManager
All Implemented Interfaces:
Closeable, AutoCloseable

public class TopicManager extends Object implements Closeable
Topic manager is responsible for creating, deleting, and updating topics. It also provides APIs to query topic metadata. It is essentially a wrapper over PubSubAdminAdapter and PubSubConsumerAdapter with additional features such as caching, metrics, and retry. TODO: We still have retries in the PubSubAdminAdapter, we will eventually move them here.
  • Constructor Details

  • Method Details

    • createTopic

      public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal)
      Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
    • createTopic

      public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, Optional<Integer> minIsr)
    • createTopic

      public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, Optional<Integer> minIsr, boolean useFastPubSubOperationTimeout)
      Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
      Parameters:
      topicName - Name for the new topic
      numPartitions - number of partitions
      replication - replication factor
      eternal - if true, the topic will have "infinite" (~250 mil years) retention if false, its retention will be set to PubSubConstants.DEFAULT_TOPIC_RETENTION_POLICY_MS by default
      logCompaction - whether to enable log compaction on the topic
      minIsr - if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be used
      useFastPubSubOperationTimeout - if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
    • createTopic

      public void createTopic(PubSubTopic topicName, int numPartitions, int replication, long retentionTimeMs, boolean logCompaction, Optional<Integer> minIsr, boolean useFastPubSubOperationTimeout)
      Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
      Parameters:
      topicName - Name for the new topic
      numPartitions - number of partitions
      replication - replication factor
      retentionTimeMs - Retention time, in ms, for the topic
      logCompaction - whether to enable log compaction on the topic
      minIsr - if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be used
      useFastPubSubOperationTimeout - if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
    • waitUntilTopicCreated

      protected void waitUntilTopicCreated(PubSubTopic topicName, int partitionCount, long deadlineMs)
    • updateTopicRetention

      public boolean updateTopicRetention(PubSubTopic topicName, long retentionInMS) throws PubSubTopicDoesNotExistException
      Update retention for the given topic. If the topic doesn't exist, this operation will throw PubSubTopicDoesNotExistException
      Parameters:
      topicName -
      retentionInMS -
      Returns:
      true if the retention time config of the input topic gets updated; return false if nothing gets updated
      Throws:
      PubSubTopicDoesNotExistException
    • updateTopicRetention

      public boolean updateTopicRetention(PubSubTopic topicName, long expectedRetentionInMs, PubSubTopicConfiguration pubSubTopicConfiguration) throws PubSubTopicDoesNotExistException
      Update retention for the given topic given a Properties.
      Parameters:
      topicName -
      expectedRetentionInMs -
      pubSubTopicConfiguration -
      Returns:
      true if the retention time gets updated; false if no update is needed.
      Throws:
      PubSubTopicDoesNotExistException
    • updateTopicRetentionWithRetries

      public boolean updateTopicRetentionWithRetries(PubSubTopic topicName, long expectedRetentionInMs)
    • updateTopicCompactionPolicy

      public void updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted)
    • updateTopicCompactionPolicy

      public void updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted, long minLogCompactionLagMs, Optional<Long> maxLogCompactionLagMs) throws PubSubTopicDoesNotExistException
      Update topic compaction policy.
      Parameters:
      topic -
      expectedLogCompacted -
      minLogCompactionLagMs - the overrode min log compaction lag. If this is specified and a valid number (> 0), it will override the default config
      Throws:
      PubSubTopicDoesNotExistException
    • isTopicCompactionEnabled

      public boolean isTopicCompactionEnabled(PubSubTopic topicName)
    • getTopicMinLogCompactionLagMs

      public long getTopicMinLogCompactionLagMs(PubSubTopic topicName)
    • getTopicMaxLogCompactionLagMs

      public Optional<Long> getTopicMaxLogCompactionLagMs(PubSubTopic topicName)
    • updateTopicMinInSyncReplica

      public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR) throws PubSubTopicDoesNotExistException
      Throws:
      PubSubTopicDoesNotExistException
    • getAllTopicRetentions

      public Map<PubSubTopic,Long> getAllTopicRetentions()
      Get retention time for all topics in the pubsub cluster.
      Returns:
      a map of topic name to retention time in MS.
    • getTopicRetention

      public long getTopicRetention(PubSubTopic topicName) throws PubSubTopicDoesNotExistException
      Return topic retention time in MS.
      Throws:
      PubSubTopicDoesNotExistException
    • getTopicRetention

      public static long getTopicRetention(PubSubTopicConfiguration pubSubTopicConfiguration)
    • isTopicTruncated

      public boolean isTopicTruncated(PubSubTopic topicName, long truncatedTopicMaxRetentionMs)
      Check whether topic is absent or truncated
      Parameters:
      topicName -
      truncatedTopicMaxRetentionMs -
      Returns:
      true if the topic does not exist or if it exists but its retention time is below truncated threshold false if the topic exists and its retention time is above truncated threshold
    • isRetentionBelowTruncatedThreshold

      public boolean isRetentionBelowTruncatedThreshold(long retention, long truncatedTopicMaxRetentionMs)
    • getTopicConfig

      public PubSubTopicConfiguration getTopicConfig(PubSubTopic pubSubTopic)
      This operation is a little heavy, since it will pull the configs for all the topics.
    • getTopicConfigWithRetry

      public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic topicName)
    • getCachedTopicConfig

      public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic topicName)
      Still heavy, but can be called repeatedly to amortize that cost.
    • getSomeTopicConfigs

      public Map<PubSubTopic,PubSubTopicConfiguration> getSomeTopicConfigs(Set<PubSubTopic> topicNames)
    • ensureTopicIsDeletedAndBlock

      public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic)
      Delete a topic and block until it is deleted or operation times out.
      Parameters:
      pubSubTopic - topic to delete
    • ensureTopicIsDeletedAndBlockWithRetry

      public void ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic)
      Delete a topic with retry and block until it is deleted or operation times out.
      Parameters:
      pubSubTopic -
    • listTopics

      public Set<PubSubTopic> listTopics()
    • containsTopicWithExpectationAndRetry

      public boolean containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult)
    • containsTopicWithExpectationAndRetry

      public boolean containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult, Duration initialBackoff, Duration maxBackoff, Duration maxDuration)
    • containsTopicAndAllPartitionsAreOnline

      public boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic topic)
    • containsTopicAndAllPartitionsAreOnline

      public boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic topic, Integer expectedPartitionCount)
      This is an extensive check to mitigate an edge-case where a topic is not yet created in all the brokers.
      Returns:
      true if the topic exists and all its partitions have at least one in-sync replica false if the topic does not exist at all or if it exists but isn't completely available
    • setTopicConfigCache

      public void setTopicConfigCache(com.github.benmanes.caffeine.cache.Cache<PubSubTopic,PubSubTopicConfiguration> topicConfigCache)
    • getTopicPartitionInfo

      public List<PubSubTopicPartitionInfo> getTopicPartitionInfo(PubSubTopic pubSubTopic)
      Get information about all partitions for a given topic.
      Parameters:
      pubSubTopic - the topic to get partition info for
      Returns:
      a list of PubSubTopicPartitionInfo for the given topic
    • getTopicLatestOffsets

      public it.unimi.dsi.fastutil.ints.Int2LongMap getTopicLatestOffsets(PubSubTopic pubSubTopic)
      Get the latest offsets for all partitions of a given topic.
      Parameters:
      pubSubTopic - the topic to get latest offsets for
      Returns:
      a Map of partition to the latest offset, or an empty map if there's any problem getting the offsets
    • getPartitionCount

      public int getPartitionCount(PubSubTopic pubSubTopic)
      Get partition count for a given topic.
      Parameters:
      pubSubTopic - the topic to get partition count for
      Returns:
      the number of partitions for the given topic
      Throws:
      PubSubTopicDoesNotExistException - if the topic does not exist
    • containsTopic

      public boolean containsTopic(PubSubTopic pubSubTopic)
    • containsTopicCached

      public boolean containsTopicCached(PubSubTopic pubSubTopic)
    • containsTopicWithRetries

      public boolean containsTopicWithRetries(PubSubTopic pubSubTopic, int retries)
    • getLatestOffsetWithRetries

      public long getLatestOffsetWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries)
    • getLatestOffsetCached

      public long getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId)
    • getLatestOffsetCachedNonBlocking

      public long getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId)
    • getProducerTimestampOfLastDataMessageWithRetries

      public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries)
    • getProducerTimestampOfLastDataMessageCached

      public long getProducerTimestampOfLastDataMessageCached(PubSubTopicPartition pubSubTopicPartition)
    • getOffsetByTime

      public long getOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long timestamp)
      Get offsets for only one partition with a specific timestamp.
    • invalidateCache

      public CompletableFuture<Void> invalidateCache(PubSubTopic pubSubTopic)
      Invalidate the cache for the given topic and all its partitions.
      Parameters:
      pubSubTopic - the topic to invalidate
    • prefetchAndCacheLatestOffset

      public void prefetchAndCacheLatestOffset(PubSubTopicPartition pubSubTopicPartition)
      Prefetch and cache the latest offset for the given topic-partition.
      Parameters:
      pubSubTopicPartition - the topic-partition to prefetch and cache the latest offset for
    • getPubSubClusterAddress

      public String getPubSubClusterAddress()
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • toString

      public String toString()
      Overrides:
      toString in class Object