Class TopicCleanupService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.venice.controller.kafka.TopicCleanupService
- All Implemented Interfaces:
Closeable
,AutoCloseable
- Direct Known Subclasses:
TopicCleanupServiceForParentController
The topic cleanup in Venice adopts the following strategy:
1. When Controller needs to clean up topics for retired versions or uncompleted store pushes or store deletion, it only
truncates the topics (lower topic retention) instead of deleting them right away.
2. The
TopicCleanupService
is working as a single process to clean up all the unused topics.
With this way, most of time (no leadership handover), there is only one controller talking to Kafka to delete topic, which is expected
from Kafka's perspective to avoid concurrent topic deletion.
In theory, it is still possible to have two controllers talking to Kafka to delete topic during leadership handover since
the previous leader controller could be still working on the topic cleaning up but the new leader controller starts
processing.
If required, there might be several ways to alleviate this potential concurrent Kafka topic deletion:
1. Do leader controller check every time when deleting topic;
2. Register a callback to monitor leadership change;
3. Use a global Zookeeper lock;
Right now, TopicCleanupService
is fully decoupled from Store
since there is
only one process actively running to cleanup topics and the controller running this process may not be the leader
controller of the cluster owning the store that the topic to be deleted belongs to.
Here is how TopicCleanupService
works to clean up deprecated topics [topic with low retention policy]:
1. This service is only running in leader controller of controller cluster, which means there should be only one
topic cleanup service running among all the Venice clusters (not strictly considering leader handover.);
2. This service is running in a infinite loop, which will execute the following operations:
2.1 For every round, check whether current controller is the leader controller of controller parent.
If yes, continue; Otherwise, sleep for a pre-configured period and check again;
2.2 Collect all the topics and categorize them based on store names;
2.3 For deprecated real-time topic, will remove it right away;
2.4 For deprecated version topics, will keep pre-configured minimal unused topics to avoid MM crash and remove others;-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Modifier and TypeFieldDescriptionprotected final int
protected final VeniceControllerMultiClusterConfig
protected final long
static final Comparator<PubSubTopic>
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
ConstructorDescriptionTopicCleanupService
(Admin admin, VeniceControllerMultiClusterConfig multiClusterConfigs, PubSubTopicRepository pubSubTopicRepository, TopicCleanupServiceStats topicCleanupServiceStats, PubSubClientsFactory pubSubClientsFactory) -
Method Summary
Modifier and TypeMethodDescriptionstatic List<PubSubTopic>
extractVersionTopicsToCleanup
(Admin admin, Map<PubSubTopic, Long> topicRetentions, int minNumberOfUnusedKafkaTopicsToPreserve, int delayFactor) Filter Venice version topics so that the returned topics satisfying the following conditions: topic is truncated based on retention time.static Map<String,
Map<PubSubTopic, Long>> getAllVeniceStoreTopicsRetentions
(Map<PubSubTopic, Long> topicsWithRetention) boolean
void
-
Field Details
-
sleepIntervalBetweenTopicListFetchMs
protected final long sleepIntervalBetweenTopicListFetchMs -
delayFactor
protected final int delayFactor -
multiClusterConfigs
-
topicPriorityComparator
-
-
Constructor Details
-
TopicCleanupService
public TopicCleanupService(Admin admin, VeniceControllerMultiClusterConfig multiClusterConfigs, PubSubTopicRepository pubSubTopicRepository, TopicCleanupServiceStats topicCleanupServiceStats, PubSubClientsFactory pubSubClientsFactory)
-
-
Method Details
-
startInner
- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work). - Throws:
Exception
-
stopInner
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
Exception
-
getAllVeniceStoreTopicsRetentions
public static Map<String,Map<PubSubTopic, getAllVeniceStoreTopicsRetentionsLong>> (Map<PubSubTopic, Long> topicsWithRetention) - Returns:
- a map object that maps from the store name to the Kafka topic name and its configured Kafka retention time.
-
extractVersionTopicsToCleanup
public static List<PubSubTopic> extractVersionTopicsToCleanup(Admin admin, Map<PubSubTopic, Long> topicRetentions, int minNumberOfUnusedKafkaTopicsToPreserve, int delayFactor) Filter Venice version topics so that the returned topics satisfying the following conditions:- topic is truncated based on retention time.
- topic version is in the deletion range.
- current controller is the parent controller or Helix resource for this topic is already removed in child controller.
- topic is a real time topic;
or topic is a version topic and passes delay countdown condition.
- Returns:
- a list that contains topics satisfying all the above conditions.
-