Class IsolatedIngestionServer
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.ingestion.isolated.IsolatedIngestionServer
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
public class IsolatedIngestionServer extends AbstractVeniceService
This class is the server service of the isolated ingestion. It is a Netty based server that listens to all the requests sent fromIsolatedIngestionBackend
in main process and spawnsIsolatedIngestionServerHandler
to handle the request. The general workflow goes as follows: (1) When server instance is created in child process, it will remain idle until it receives initialization request from main process, which pass in all the configs needed to initialize all ingestion components. (2) Once initialization completes, it starts listening to ingestion command sent from main process. (3) When ingestion notifier in child process is notified, it will use itsMainIngestionRequestClient
to relay status back toMainIngestionMonitorService
in main process.MainIngestionMonitorService
will further dispatch status reporting to all registered notifiers in main process. -- For COMPLETED status, it will stop ingestion and shutdown corresponding storage so main process can re-subscribe it for serving purpose. -- For ERROR status, it will also stop ingestion and shutdown storage, and it will also forward the ERROR status for main process to handle. The server will not persist any ingestion status to the disk. When the child process crashes,MainIngestionMonitorService
will be responsible for respawning a new instance and resuming all ongoing ingestion tasks for fault tolerance purpose.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description IsolatedIngestionServer(java.lang.String configPath)
-
Method Summary
-
-
-
Method Detail
-
startInner
public boolean startInner()
- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
public void stopInner() throws java.lang.Exception
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
java.lang.Exception
-
isInitiated
public boolean isInitiated()
-
getStorageService
public StorageService getStorageService()
-
getConfigLoader
public VeniceConfigLoader getConfigLoader()
-
getStoreIngestionService
public KafkaStoreIngestionService getStoreIngestionService()
-
getStorageMetadataService
public StorageMetadataService getStorageMetadataService()
-
getStoreRepository
public ReadOnlyStoreRepository getStoreRepository()
-
getMetricsRepository
public io.tehuti.metrics.MetricsRepository getMetricsRepository()
-
getIngestionBackend
public DefaultIngestionBackend getIngestionBackend()
-
getPartitionStateSerializer
public InternalAvroSpecificSerializer<PartitionState> getPartitionStateSerializer()
-
getStopConsumptionTimeoutInSeconds
public int getStopConsumptionTimeoutInSeconds()
-
getMetricsMap
public java.util.Map<java.lang.String,java.lang.Double> getMetricsMap()
-
updateHeartbeatTime
public void updateHeartbeatTime()
-
cleanupTopicPartitionState
public void cleanupTopicPartitionState(java.lang.String topicName, int partitionId)
-
cleanupTopicState
public void cleanupTopicState(java.lang.String topicName)
-
reportIngestionStatus
public void reportIngestionStatus(IngestionTaskReport report)
Use executor to execute status reporting in async fashion, otherwise it may cause deadlock between main process and child process. One previous example of the deadlock situation could happen when VersionBackend is trying to unsubscribe a topic partition, it will hold VersionBackend instance lock, and send a blocking call to isolated ingestion service to callKafkaStoreIngestionService.stopConsumptionAndWait(VeniceStoreVersionConfig, int, int, int, boolean)
, inside which it will wait up to 30 seconds to drain internal messages for the partition. For SOP message, it will callVeniceNotifier.started(String, int)
and in ingestion isolation case it will send a blocking call to main process to report progress. The logic inside Da Vinci Client ingestion notifier's started() will call tryStartHeartbeat() in VersionBackend which will also need the VersionBackend instance lock. Thus all of them get stuck until timeout, which leads to unexpected behavior of draining to closed RocksDB storage. This status reporting executor is designed to be single thread to respect the reporting order inside child process. For time-consuming action like stopConsumptionAndWait, we introduced an extra multi-thread executors to improve the performance.
-
getRedundantExceptionFilter
public RedundantExceptionFilter getRedundantExceptionFilter()
-
setResourceToBeUnsubscribed
public void setResourceToBeUnsubscribed(java.lang.String topicName, int partition)
-
setResourceToBeSubscribed
public void setResourceToBeSubscribed(java.lang.String topicName, int partition)
-
isResourceSubscribed
public boolean isResourceSubscribed(java.lang.String topicName, int partition)
-
maybeInitializeResourceHostingMetadata
public void maybeInitializeResourceHostingMetadata(java.lang.String topicName, int partition)
-
main
public static void main(java.lang.String[] args) throws java.lang.Exception
- Throws:
java.lang.Exception
-
-