Class TopicManager
java.lang.Object
com.linkedin.venice.pubsub.manager.TopicManager
- All Implemented Interfaces:
Closeable
,AutoCloseable
Topic manager is responsible for creating, deleting, and updating topics. It also provides APIs to query topic metadata.
It is essentially a wrapper over
PubSubAdminAdapter
and PubSubConsumerAdapter
with additional
features such as caching, metrics, and retry.
TODO: We still have retries in the PubSubAdminAdapter
, we will eventually move them here.-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
boolean
containsTopic
(PubSubTopic pubSubTopic) boolean
boolean
containsTopicAndAllPartitionsAreOnline
(PubSubTopic topic, Integer expectedPartitionCount) This is an extensive check to mitigate an edge-case where a topic is not yet created in all the brokers.boolean
containsTopicCached
(PubSubTopic pubSubTopic) boolean
containsTopicWithExpectationAndRetry
(PubSubTopic topic, int maxAttempts, boolean expectedResult) See Java doc ofPubSubAdminAdapter.containsTopicWithExpectationAndRetry(com.linkedin.venice.pubsub.api.PubSubTopic, int, boolean)
which provides exactly the same semantics.boolean
containsTopicWithExpectationAndRetry
(PubSubTopic topic, int maxAttempts, boolean expectedResult, Duration initialBackoff, Duration maxBackoff, Duration maxDuration) boolean
containsTopicWithRetries
(PubSubTopic pubSubTopic, int retries) void
createTopic
(PubSubTopic topicName, int numPartitions, int replication, boolean eternal) Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.void
createTopic
(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, Optional<Integer> minIsr) void
createTopic
(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, Optional<Integer> minIsr, boolean useFastPubSubOperationTimeout) Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.void
createTopic
(PubSubTopic topicName, int numPartitions, int replication, long retentionTimeMs, boolean logCompaction, Optional<Integer> minIsr, boolean useFastPubSubOperationTimeout) Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.void
ensureTopicIsDeletedAndBlock
(PubSubTopic pubSubTopic) Delete a topic and block until it is deleted or operation times out.void
ensureTopicIsDeletedAndBlockWithRetry
(PubSubTopic pubSubTopic) Delete a topic with retry and block until it is deleted or operation times out.Get retention time for all topics in the pubsub cluster.getCachedTopicConfig
(PubSubTopic topicName) Still heavy, but can be called repeatedly to amortize that cost.long
getLatestOffsetCached
(PubSubTopic pubSubTopic, int partitionId) long
getLatestOffsetCachedNonBlocking
(PubSubTopic pubSubTopic, int partitionId) long
getLatestOffsetWithRetries
(PubSubTopicPartition pubSubTopicPartition, int retries) long
getOffsetByTime
(PubSubTopicPartition pubSubTopicPartition, long timestamp) Get offsets for only one partition with a specific timestamp.int
getPartitionCount
(PubSubTopic pubSubTopic) Get partition count for a given topic.long
getProducerTimestampOfLastDataMessageCached
(PubSubTopicPartition pubSubTopicPartition) long
getProducerTimestampOfLastDataMessageWithRetries
(PubSubTopicPartition pubSubTopicPartition, int retries) getSomeTopicConfigs
(Set<PubSubTopic> topicNames) getTopicConfig
(PubSubTopic pubSubTopic) This operation is a little heavy, since it will pull the configs for all the topics.getTopicConfigWithRetry
(PubSubTopic topicName) it.unimi.dsi.fastutil.ints.Int2LongMap
getTopicLatestOffsets
(PubSubTopic pubSubTopic) Get the latest offsets for all partitions of a given topic.getTopicMaxLogCompactionLagMs
(PubSubTopic topicName) long
getTopicMinLogCompactionLagMs
(PubSubTopic topicName) getTopicPartitionInfo
(PubSubTopic pubSubTopic) Get information about all partitions for a given topic.long
getTopicRetention
(PubSubTopic topicName) Return topic retention time in MS.static long
getTopicRetention
(PubSubTopicConfiguration pubSubTopicConfiguration) invalidateCache
(PubSubTopic pubSubTopic) Invalidate the cache for the given topic and all its partitions.boolean
isRetentionBelowTruncatedThreshold
(long retention, long truncatedTopicMaxRetentionMs) boolean
isTopicCompactionEnabled
(PubSubTopic topicName) boolean
isTopicTruncated
(PubSubTopic topicName, long truncatedTopicMaxRetentionMs) Check whether topic is absent or truncatedvoid
prefetchAndCacheLatestOffset
(PubSubTopicPartition pubSubTopicPartition) Prefetch and cache the latest offset for the given topic-partition.void
setTopicConfigCache
(com.github.benmanes.caffeine.cache.Cache<PubSubTopic, PubSubTopicConfiguration> topicConfigCache) toString()
void
updateTopicCompactionPolicy
(PubSubTopic topic, boolean expectedLogCompacted) void
updateTopicCompactionPolicy
(PubSubTopic topic, boolean expectedLogCompacted, long minLogCompactionLagMs, Optional<Long> maxLogCompactionLagMs) Update topic compaction policy.boolean
updateTopicMinInSyncReplica
(PubSubTopic topicName, int minISR) boolean
updateTopicRetention
(PubSubTopic topicName, long retentionInMS) Update retention for the given topic.boolean
updateTopicRetention
(PubSubTopic topicName, long expectedRetentionInMs, PubSubTopicConfiguration pubSubTopicConfiguration) Update retention for the given topic given aProperties
.boolean
updateTopicRetentionWithRetries
(PubSubTopic topicName, long expectedRetentionInMs) protected void
waitUntilTopicCreated
(PubSubTopic topicName, int partitionCount, long deadlineMs)
-
Constructor Details
-
TopicManager
-
-
Method Details
-
createTopic
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException. -
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, Optional<Integer> minIsr) -
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, Optional<Integer> minIsr, boolean useFastPubSubOperationTimeout) Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.- Parameters:
topicName
- Name for the new topicnumPartitions
- number of partitionsreplication
- replication factoreternal
- if true, the topic will have "infinite" (~250 mil years) retention if false, its retention will be set toPubSubConstants.DEFAULT_TOPIC_RETENTION_POLICY_MS
by defaultlogCompaction
- whether to enable log compaction on the topicminIsr
- if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be useduseFastPubSubOperationTimeout
- if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
-
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, long retentionTimeMs, boolean logCompaction, Optional<Integer> minIsr, boolean useFastPubSubOperationTimeout) Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.- Parameters:
topicName
- Name for the new topicnumPartitions
- number of partitionsreplication
- replication factorretentionTimeMs
- Retention time, in ms, for the topiclogCompaction
- whether to enable log compaction on the topicminIsr
- if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be useduseFastPubSubOperationTimeout
- if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
-
waitUntilTopicCreated
-
updateTopicRetention
public boolean updateTopicRetention(PubSubTopic topicName, long retentionInMS) throws PubSubTopicDoesNotExistException Update retention for the given topic. If the topic doesn't exist, this operation will throwPubSubTopicDoesNotExistException
- Parameters:
topicName
-retentionInMS
-- Returns:
- true if the retention time config of the input topic gets updated; return false if nothing gets updated
- Throws:
PubSubTopicDoesNotExistException
-
updateTopicRetention
public boolean updateTopicRetention(PubSubTopic topicName, long expectedRetentionInMs, PubSubTopicConfiguration pubSubTopicConfiguration) throws PubSubTopicDoesNotExistException Update retention for the given topic given aProperties
.- Parameters:
topicName
-expectedRetentionInMs
-pubSubTopicConfiguration
-- Returns:
- true if the retention time gets updated; false if no update is needed.
- Throws:
PubSubTopicDoesNotExistException
-
updateTopicRetentionWithRetries
-
updateTopicCompactionPolicy
-
updateTopicCompactionPolicy
public void updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted, long minLogCompactionLagMs, Optional<Long> maxLogCompactionLagMs) throws PubSubTopicDoesNotExistException Update topic compaction policy.- Parameters:
topic
-expectedLogCompacted
-minLogCompactionLagMs
- the overrode min log compaction lag. If this is specified and a valid number (> 0), it will override the default config- Throws:
PubSubTopicDoesNotExistException
-
isTopicCompactionEnabled
-
getTopicMinLogCompactionLagMs
-
getTopicMaxLogCompactionLagMs
-
updateTopicMinInSyncReplica
public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR) throws PubSubTopicDoesNotExistException - Throws:
PubSubTopicDoesNotExistException
-
getAllTopicRetentions
Get retention time for all topics in the pubsub cluster.- Returns:
- a map of topic name to retention time in MS.
-
getTopicRetention
Return topic retention time in MS.- Throws:
PubSubTopicDoesNotExistException
-
getTopicRetention
-
isTopicTruncated
Check whether topic is absent or truncated- Parameters:
topicName
-truncatedTopicMaxRetentionMs
-- Returns:
- true if the topic does not exist or if it exists but its retention time is below truncated threshold false if the topic exists and its retention time is above truncated threshold
-
isRetentionBelowTruncatedThreshold
public boolean isRetentionBelowTruncatedThreshold(long retention, long truncatedTopicMaxRetentionMs) -
getTopicConfig
This operation is a little heavy, since it will pull the configs for all the topics. -
getTopicConfigWithRetry
-
getCachedTopicConfig
Still heavy, but can be called repeatedly to amortize that cost. -
getSomeTopicConfigs
-
ensureTopicIsDeletedAndBlock
Delete a topic and block until it is deleted or operation times out.- Parameters:
pubSubTopic
- topic to delete
-
ensureTopicIsDeletedAndBlockWithRetry
Delete a topic with retry and block until it is deleted or operation times out.- Parameters:
pubSubTopic
-
-
listTopics
-
containsTopicWithExpectationAndRetry
public boolean containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult) See Java doc ofPubSubAdminAdapter.containsTopicWithExpectationAndRetry(com.linkedin.venice.pubsub.api.PubSubTopic, int, boolean)
which provides exactly the same semantics. -
containsTopicWithExpectationAndRetry
public boolean containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult, Duration initialBackoff, Duration maxBackoff, Duration maxDuration) -
containsTopicAndAllPartitionsAreOnline
-
containsTopicAndAllPartitionsAreOnline
public boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic topic, Integer expectedPartitionCount) This is an extensive check to mitigate an edge-case where a topic is not yet created in all the brokers.- Returns:
- true if the topic exists and all its partitions have at least one in-sync replica false if the topic does not exist at all or if it exists but isn't completely available
-
setTopicConfigCache
public void setTopicConfigCache(com.github.benmanes.caffeine.cache.Cache<PubSubTopic, PubSubTopicConfiguration> topicConfigCache) -
getTopicPartitionInfo
Get information about all partitions for a given topic.- Parameters:
pubSubTopic
- the topic to get partition info for- Returns:
- a list of
PubSubTopicPartitionInfo
for the given topic
-
getTopicLatestOffsets
Get the latest offsets for all partitions of a given topic.- Parameters:
pubSubTopic
- the topic to get latest offsets for- Returns:
- a Map of partition to the latest offset, or an empty map if there's any problem getting the offsets
-
getPartitionCount
Get partition count for a given topic.- Parameters:
pubSubTopic
- the topic to get partition count for- Returns:
- the number of partitions for the given topic
- Throws:
PubSubTopicDoesNotExistException
- if the topic does not exist
-
containsTopic
-
containsTopicCached
-
containsTopicWithRetries
-
getLatestOffsetWithRetries
-
getLatestOffsetCached
-
getLatestOffsetCachedNonBlocking
-
getProducerTimestampOfLastDataMessageWithRetries
public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries) -
getProducerTimestampOfLastDataMessageCached
-
getOffsetByTime
Get offsets for only one partition with a specific timestamp. -
invalidateCache
Invalidate the cache for the given topic and all its partitions.- Parameters:
pubSubTopic
- the topic to invalidate
-
prefetchAndCacheLatestOffset
Prefetch and cache the latest offset for the given topic-partition.- Parameters:
pubSubTopicPartition
- the topic-partition to prefetch and cache the latest offset for
-
getPubSubClusterAddress
-
close
public void close()- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-
toString
-