Class TopicManager
- java.lang.Object
-
- com.linkedin.venice.pubsub.manager.TopicManager
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
public class TopicManager extends java.lang.Object implements java.io.Closeable
Topic manager is responsible for creating, deleting, and updating topics. It also provides APIs to query topic metadata. It is essentially a wrapper overPubSubAdminAdapter
andPubSubConsumerAdapter
with additional features such as caching, metrics, and retry. TODO: We still have retries in thePubSubAdminAdapter
, we will eventually move them here.
-
-
Constructor Summary
Constructors Constructor Description TopicManager(java.lang.String pubSubClusterAddress, TopicManagerContext context)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close()
boolean
containsTopic(PubSubTopic pubSubTopic)
boolean
containsTopicAndAllPartitionsAreOnline(PubSubTopic topic)
boolean
containsTopicAndAllPartitionsAreOnline(PubSubTopic topic, java.lang.Integer expectedPartitionCount)
This is an extensive check to mitigate an edge-case where a topic is not yet created in all the brokers.boolean
containsTopicCached(PubSubTopic pubSubTopic)
boolean
containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult)
See Java doc ofPubSubAdminAdapter.containsTopicWithExpectationAndRetry(com.linkedin.venice.pubsub.api.PubSubTopic, int, boolean)
which provides exactly the same semantics.boolean
containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult, java.time.Duration initialBackoff, java.time.Duration maxBackoff, java.time.Duration maxDuration)
boolean
containsTopicWithRetries(PubSubTopic pubSubTopic, int retries)
void
createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal)
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.void
createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, java.util.Optional<java.lang.Integer> minIsr)
void
createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, java.util.Optional<java.lang.Integer> minIsr, boolean useFastPubSubOperationTimeout)
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.void
createTopic(PubSubTopic topicName, int numPartitions, int replication, long retentionTimeMs, boolean logCompaction, java.util.Optional<java.lang.Integer> minIsr, boolean useFastPubSubOperationTimeout)
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.void
ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic)
Delete a topic and block until it is deleted or operation times out.void
ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic)
Delete a topic with retry and block until it is deleted or operation times out.java.util.Map<PubSubTopic,java.lang.Long>
getAllTopicRetentions()
Get retention time for all topics in the pubsub cluster.PubSubTopicConfiguration
getCachedTopicConfig(PubSubTopic topicName)
Still heavy, but can be called repeatedly to amortize that cost.long
getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId)
long
getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId)
long
getLatestOffsetWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries)
long
getOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long timestamp)
Get offsets for only one partition with a specific timestamp.int
getPartitionCount(PubSubTopic pubSubTopic)
Get partition count for a given topic.long
getProducerTimestampOfLastDataMessageCached(PubSubTopicPartition pubSubTopicPartition)
long
getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries)
java.lang.String
getPubSubClusterAddress()
java.util.Map<PubSubTopic,PubSubTopicConfiguration>
getSomeTopicConfigs(java.util.Set<PubSubTopic> topicNames)
PubSubTopicConfiguration
getTopicConfig(PubSubTopic pubSubTopic)
This operation is a little heavy, since it will pull the configs for all the topics.PubSubTopicConfiguration
getTopicConfigWithRetry(PubSubTopic topicName)
it.unimi.dsi.fastutil.ints.Int2LongMap
getTopicLatestOffsets(PubSubTopic pubSubTopic)
Get the latest offsets for all partitions of a given topic.java.util.Optional<java.lang.Long>
getTopicMaxLogCompactionLagMs(PubSubTopic topicName)
long
getTopicMinLogCompactionLagMs(PubSubTopic topicName)
java.util.List<PubSubTopicPartitionInfo>
getTopicPartitionInfo(PubSubTopic pubSubTopic)
Get information about all partitions for a given topic.long
getTopicRetention(PubSubTopic topicName)
Return topic retention time in MS.static long
getTopicRetention(PubSubTopicConfiguration pubSubTopicConfiguration)
java.util.concurrent.CompletableFuture<java.lang.Void>
invalidateCache(PubSubTopic pubSubTopic)
Invalidate the cache for the given topic and all its partitions.boolean
isRetentionBelowTruncatedThreshold(long retention, long truncatedTopicMaxRetentionMs)
boolean
isTopicCompactionEnabled(PubSubTopic topicName)
boolean
isTopicTruncated(PubSubTopic topicName, long truncatedTopicMaxRetentionMs)
Check whether topic is absent or truncatedjava.util.Set<PubSubTopic>
listTopics()
void
prefetchAndCacheLatestOffset(PubSubTopicPartition pubSubTopicPartition)
Prefetch and cache the latest offset for the given topic-partition.void
setTopicConfigCache(com.github.benmanes.caffeine.cache.Cache<PubSubTopic,PubSubTopicConfiguration> topicConfigCache)
java.lang.String
toString()
void
updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted)
void
updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted, long minLogCompactionLagMs, java.util.Optional<java.lang.Long> maxLogCompactionLagMs)
Update topic compaction policy.boolean
updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR)
boolean
updateTopicRetention(PubSubTopic topicName, long retentionInMS)
Update retention for the given topic.boolean
updateTopicRetention(PubSubTopic topicName, long expectedRetentionInMs, PubSubTopicConfiguration pubSubTopicConfiguration)
Update retention for the given topic given aProperties
.protected void
waitUntilTopicCreated(PubSubTopic topicName, int partitionCount, long deadlineMs)
-
-
-
Constructor Detail
-
TopicManager
public TopicManager(java.lang.String pubSubClusterAddress, TopicManagerContext context)
-
-
Method Detail
-
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal)
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.
-
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, java.util.Optional<java.lang.Integer> minIsr)
-
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, boolean eternal, boolean logCompaction, java.util.Optional<java.lang.Integer> minIsr, boolean useFastPubSubOperationTimeout)
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.- Parameters:
topicName
- Name for the new topicnumPartitions
- number of partitionsreplication
- replication factoreternal
- if true, the topic will have "infinite" (~250 mil years) retention if false, its retention will be set toPubSubConstants.DEFAULT_TOPIC_RETENTION_POLICY_MS
by defaultlogCompaction
- whether to enable log compaction on the topicminIsr
- if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be useduseFastPubSubOperationTimeout
- if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
-
createTopic
public void createTopic(PubSubTopic topicName, int numPartitions, int replication, long retentionTimeMs, boolean logCompaction, java.util.Optional<java.lang.Integer> minIsr, boolean useFastPubSubOperationTimeout)
Create a topic, and block until the topic is created, with a default timeout of 30000, after which this function will throw a VeniceException.- Parameters:
topicName
- Name for the new topicnumPartitions
- number of partitionsreplication
- replication factorretentionTimeMs
- Retention time, in ms, for the topiclogCompaction
- whether to enable log compaction on the topicminIsr
- if present, will apply the specified min.isr to this topic, if absent, PubSub cluster defaults will be useduseFastPubSubOperationTimeout
- if false, normal PubSub operation timeout will be used, if true, a much shorter timeout will be used to make topic creation non-blocking.
-
waitUntilTopicCreated
protected void waitUntilTopicCreated(PubSubTopic topicName, int partitionCount, long deadlineMs)
-
updateTopicRetention
public boolean updateTopicRetention(PubSubTopic topicName, long retentionInMS) throws PubSubTopicDoesNotExistException
Update retention for the given topic. If the topic doesn't exist, this operation will throwPubSubTopicDoesNotExistException
- Parameters:
topicName
-retentionInMS
-- Returns:
- true if the retention time config of the input topic gets updated; return false if nothing gets updated
- Throws:
PubSubTopicDoesNotExistException
-
updateTopicRetention
public boolean updateTopicRetention(PubSubTopic topicName, long expectedRetentionInMs, PubSubTopicConfiguration pubSubTopicConfiguration) throws PubSubTopicDoesNotExistException
Update retention for the given topic given aProperties
.- Parameters:
topicName
-expectedRetentionInMs
-pubSubTopicConfiguration
-- Returns:
- true if the retention time gets updated; false if no update is needed.
- Throws:
PubSubTopicDoesNotExistException
-
updateTopicCompactionPolicy
public void updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted)
-
updateTopicCompactionPolicy
public void updateTopicCompactionPolicy(PubSubTopic topic, boolean expectedLogCompacted, long minLogCompactionLagMs, java.util.Optional<java.lang.Long> maxLogCompactionLagMs) throws PubSubTopicDoesNotExistException
Update topic compaction policy.- Parameters:
topic
-expectedLogCompacted
-minLogCompactionLagMs
- the overrode min log compaction lag. If this is specified and a valid number (> 0), it will override the default config- Throws:
PubSubTopicDoesNotExistException
-
isTopicCompactionEnabled
public boolean isTopicCompactionEnabled(PubSubTopic topicName)
-
getTopicMinLogCompactionLagMs
public long getTopicMinLogCompactionLagMs(PubSubTopic topicName)
-
getTopicMaxLogCompactionLagMs
public java.util.Optional<java.lang.Long> getTopicMaxLogCompactionLagMs(PubSubTopic topicName)
-
updateTopicMinInSyncReplica
public boolean updateTopicMinInSyncReplica(PubSubTopic topicName, int minISR) throws PubSubTopicDoesNotExistException
- Throws:
PubSubTopicDoesNotExistException
-
getAllTopicRetentions
public java.util.Map<PubSubTopic,java.lang.Long> getAllTopicRetentions()
Get retention time for all topics in the pubsub cluster.- Returns:
- a map of topic name to retention time in MS.
-
getTopicRetention
public long getTopicRetention(PubSubTopic topicName) throws PubSubTopicDoesNotExistException
Return topic retention time in MS.- Throws:
PubSubTopicDoesNotExistException
-
getTopicRetention
public static long getTopicRetention(PubSubTopicConfiguration pubSubTopicConfiguration)
-
isTopicTruncated
public boolean isTopicTruncated(PubSubTopic topicName, long truncatedTopicMaxRetentionMs)
Check whether topic is absent or truncated- Parameters:
topicName
-truncatedTopicMaxRetentionMs
-- Returns:
- true if the topic does not exist or if it exists but its retention time is below truncated threshold false if the topic exists and its retention time is above truncated threshold
-
isRetentionBelowTruncatedThreshold
public boolean isRetentionBelowTruncatedThreshold(long retention, long truncatedTopicMaxRetentionMs)
-
getTopicConfig
public PubSubTopicConfiguration getTopicConfig(PubSubTopic pubSubTopic)
This operation is a little heavy, since it will pull the configs for all the topics.
-
getTopicConfigWithRetry
public PubSubTopicConfiguration getTopicConfigWithRetry(PubSubTopic topicName)
-
getCachedTopicConfig
public PubSubTopicConfiguration getCachedTopicConfig(PubSubTopic topicName)
Still heavy, but can be called repeatedly to amortize that cost.
-
getSomeTopicConfigs
public java.util.Map<PubSubTopic,PubSubTopicConfiguration> getSomeTopicConfigs(java.util.Set<PubSubTopic> topicNames)
-
ensureTopicIsDeletedAndBlock
public void ensureTopicIsDeletedAndBlock(PubSubTopic pubSubTopic)
Delete a topic and block until it is deleted or operation times out.- Parameters:
pubSubTopic
- topic to delete
-
ensureTopicIsDeletedAndBlockWithRetry
public void ensureTopicIsDeletedAndBlockWithRetry(PubSubTopic pubSubTopic)
Delete a topic with retry and block until it is deleted or operation times out.- Parameters:
pubSubTopic
-
-
listTopics
public java.util.Set<PubSubTopic> listTopics()
-
containsTopicWithExpectationAndRetry
public boolean containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult)
See Java doc ofPubSubAdminAdapter.containsTopicWithExpectationAndRetry(com.linkedin.venice.pubsub.api.PubSubTopic, int, boolean)
which provides exactly the same semantics.
-
containsTopicWithExpectationAndRetry
public boolean containsTopicWithExpectationAndRetry(PubSubTopic topic, int maxAttempts, boolean expectedResult, java.time.Duration initialBackoff, java.time.Duration maxBackoff, java.time.Duration maxDuration)
-
containsTopicAndAllPartitionsAreOnline
public boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic topic)
-
containsTopicAndAllPartitionsAreOnline
public boolean containsTopicAndAllPartitionsAreOnline(PubSubTopic topic, java.lang.Integer expectedPartitionCount)
This is an extensive check to mitigate an edge-case where a topic is not yet created in all the brokers.- Returns:
- true if the topic exists and all its partitions have at least one in-sync replica false if the topic does not exist at all or if it exists but isn't completely available
-
setTopicConfigCache
public void setTopicConfigCache(com.github.benmanes.caffeine.cache.Cache<PubSubTopic,PubSubTopicConfiguration> topicConfigCache)
-
getTopicPartitionInfo
public java.util.List<PubSubTopicPartitionInfo> getTopicPartitionInfo(PubSubTopic pubSubTopic)
Get information about all partitions for a given topic.- Parameters:
pubSubTopic
- the topic to get partition info for- Returns:
- a list of
PubSubTopicPartitionInfo
for the given topic
-
getTopicLatestOffsets
public it.unimi.dsi.fastutil.ints.Int2LongMap getTopicLatestOffsets(PubSubTopic pubSubTopic)
Get the latest offsets for all partitions of a given topic.- Parameters:
pubSubTopic
- the topic to get latest offsets for- Returns:
- a Map of partition to the latest offset, or an empty map if there's any problem getting the offsets
-
getPartitionCount
public int getPartitionCount(PubSubTopic pubSubTopic)
Get partition count for a given topic.- Parameters:
pubSubTopic
- the topic to get partition count for- Returns:
- the number of partitions for the given topic
- Throws:
PubSubTopicDoesNotExistException
- if the topic does not exist
-
containsTopic
public boolean containsTopic(PubSubTopic pubSubTopic)
-
containsTopicCached
public boolean containsTopicCached(PubSubTopic pubSubTopic)
-
containsTopicWithRetries
public boolean containsTopicWithRetries(PubSubTopic pubSubTopic, int retries)
-
getLatestOffsetWithRetries
public long getLatestOffsetWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries)
-
getLatestOffsetCached
public long getLatestOffsetCached(PubSubTopic pubSubTopic, int partitionId)
-
getLatestOffsetCachedNonBlocking
public long getLatestOffsetCachedNonBlocking(PubSubTopic pubSubTopic, int partitionId)
-
getProducerTimestampOfLastDataMessageWithRetries
public long getProducerTimestampOfLastDataMessageWithRetries(PubSubTopicPartition pubSubTopicPartition, int retries)
-
getProducerTimestampOfLastDataMessageCached
public long getProducerTimestampOfLastDataMessageCached(PubSubTopicPartition pubSubTopicPartition)
-
getOffsetByTime
public long getOffsetByTime(PubSubTopicPartition pubSubTopicPartition, long timestamp)
Get offsets for only one partition with a specific timestamp.
-
invalidateCache
public java.util.concurrent.CompletableFuture<java.lang.Void> invalidateCache(PubSubTopic pubSubTopic)
Invalidate the cache for the given topic and all its partitions.- Parameters:
pubSubTopic
- the topic to invalidate
-
prefetchAndCacheLatestOffset
public void prefetchAndCacheLatestOffset(PubSubTopicPartition pubSubTopicPartition)
Prefetch and cache the latest offset for the given topic-partition.- Parameters:
pubSubTopicPartition
- the topic-partition to prefetch and cache the latest offset for
-
getPubSubClusterAddress
public java.lang.String getPubSubClusterAddress()
-
close
public void close()
- Specified by:
close
in interfacejava.lang.AutoCloseable
- Specified by:
close
in interfacejava.io.Closeable
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
-