Class TopicCleanupService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.venice.controller.kafka.TopicCleanupService
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
- Direct Known Subclasses:
TopicCleanupServiceForParentController
public class TopicCleanupService extends AbstractVeniceService
The topic cleanup in Venice adopts the following strategy: 1. When Controller needs to clean up topics for retired versions or uncompleted store pushes or store deletion, it only truncates the topics (lower topic retention) instead of deleting them right away. 2. TheTopicCleanupService
is working as a single process to clean up all the unused topics. With this way, most of time (no leadership handover), there is only one controller talking to Kafka to delete topic, which is expected from Kafka's perspective to avoid concurrent topic deletion. In theory, it is still possible to have two controllers talking to Kafka to delete topic during leadership handover since the previous leader controller could be still working on the topic cleaning up but the new leader controller starts processing. If required, there might be several ways to alleviate this potential concurrent Kafka topic deletion: 1. Do leader controller check every time when deleting topic; 2. Register a callback to monitor leadership change; 3. Use a global Zookeeper lock; Right now,TopicCleanupService
is fully decoupled fromStore
since there is only one process actively running to cleanup topics and the controller running this process may not be the leader controller of the cluster owning the store that the topic to be deleted belongs to. Here is howTopicCleanupService
works to clean up deprecated topics [topic with low retention policy]: 1. This service is only running in leader controller of controller cluster, which means there should be only one topic cleanup service running among all the Venice clusters (not strictly considering leader handover.); 2. This service is running in a infinite loop, which will execute the following operations: 2.1 For every round, check whether current controller is the leader controller of controller parent. If yes, continue; Otherwise, sleep for a pre-configured period and check again; 2.2 Collect all the topics and categorize them based on store names; 2.3 For deprecated real-time topic, will remove it right away; 2.4 For deprecated version topics, will keep pre-configured minimal unused topics to avoid MM crash and remove others;
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
Fields Modifier and Type Field Description protected int
delayFactor
protected VeniceControllerMultiClusterConfig
multiClusterConfigs
protected long
sleepIntervalBetweenTopicListFetchMs
static java.util.Comparator<PubSubTopic>
topicPriorityComparator
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description TopicCleanupService(Admin admin, VeniceControllerMultiClusterConfig multiClusterConfigs, PubSubTopicRepository pubSubTopicRepository, TopicCleanupServiceStats topicCleanupServiceStats, PubSubClientsFactory pubSubClientsFactory)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static java.util.List<PubSubTopic>
extractVersionTopicsToCleanup(Admin admin, java.util.Map<PubSubTopic,java.lang.Long> topicRetentions, int minNumberOfUnusedKafkaTopicsToPreserve, int delayFactor)
Filter Venice version topics so that the returned topics satisfying the following conditions: topic is truncated based on retention time.static java.util.Map<java.lang.String,java.util.Map<PubSubTopic,java.lang.Long>>
getAllVeniceStoreTopicsRetentions(java.util.Map<PubSubTopic,java.lang.Long> topicsWithRetention)
boolean
startInner()
void
stopInner()
-
-
-
Field Detail
-
sleepIntervalBetweenTopicListFetchMs
protected final long sleepIntervalBetweenTopicListFetchMs
-
delayFactor
protected final int delayFactor
-
multiClusterConfigs
protected final VeniceControllerMultiClusterConfig multiClusterConfigs
-
topicPriorityComparator
public static final java.util.Comparator<PubSubTopic> topicPriorityComparator
-
-
Constructor Detail
-
TopicCleanupService
public TopicCleanupService(Admin admin, VeniceControllerMultiClusterConfig multiClusterConfigs, PubSubTopicRepository pubSubTopicRepository, TopicCleanupServiceStats topicCleanupServiceStats, PubSubClientsFactory pubSubClientsFactory)
-
-
Method Detail
-
startInner
public boolean startInner() throws java.lang.Exception
- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work). - Throws:
java.lang.Exception
-
stopInner
public void stopInner() throws java.lang.Exception
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
java.lang.Exception
-
getAllVeniceStoreTopicsRetentions
public static java.util.Map<java.lang.String,java.util.Map<PubSubTopic,java.lang.Long>> getAllVeniceStoreTopicsRetentions(java.util.Map<PubSubTopic,java.lang.Long> topicsWithRetention)
- Returns:
- a map object that maps from the store name to the Kafka topic name and its configured Kafka retention time.
-
extractVersionTopicsToCleanup
public static java.util.List<PubSubTopic> extractVersionTopicsToCleanup(Admin admin, java.util.Map<PubSubTopic,java.lang.Long> topicRetentions, int minNumberOfUnusedKafkaTopicsToPreserve, int delayFactor)
Filter Venice version topics so that the returned topics satisfying the following conditions:- topic is truncated based on retention time.
- topic version is in the deletion range.
- current controller is the parent controller or Helix resource for this topic is already removed in child controller.
- topic is a real time topic;
or topic is a version topic and passes delay countdown condition.
- Returns:
- a list that contains topics satisfying all the above conditions.
-
-