Class IsolatedIngestionServer
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.ingestion.isolated.IsolatedIngestionServer
- All Implemented Interfaces:
Closeable
,AutoCloseable
This class is the server service of the isolated ingestion. It is a Netty based server that listens to
all the requests sent from
IsolatedIngestionBackend
in main process and spawns IsolatedIngestionServerHandler
to handle the request.
The general workflow goes as follows:
(1) When server instance is created in child process, it will remain idle until it receives initialization
request from main process, which pass in all the configs needed to initialize all ingestion components.
(2) Once initialization completes, it starts listening to ingestion command sent from main process.
(3) When ingestion notifier in child process is notified, it will use its MainIngestionRequestClient
to relay
status back to MainIngestionMonitorService
in main process. MainIngestionMonitorService
will further
dispatch status reporting to all registered notifiers in main process.
-- For COMPLETED status, it will stop ingestion and shutdown corresponding storage so main process can re-subscribe it for serving purpose.
-- For ERROR status, it will also stop ingestion and shutdown storage, and it will also forward the ERROR status for main process to handle.
The server will not persist any ingestion status to the disk. When the child process crashes, MainIngestionMonitorService
will be responsible for respawning a new instance and resuming all ongoing ingestion tasks for fault tolerance purpose.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
-
Method Summary
Modifier and TypeMethodDescriptionvoid
cleanupTopicPartitionState
(String topicName, int partitionId) void
cleanupTopicState
(String topicName) io.tehuti.metrics.MetricsRepository
int
boolean
boolean
isResourceSubscribed
(String topicName, int partition) static void
void
maybeInitializeResourceHostingMetadata
(String topicName, int partition) void
Use executor to execute status reporting in async fashion, otherwise it may cause deadlock between main process and child process.void
setResourceToBeSubscribed
(String topicName, int partition) void
setResourceToBeUnsubscribed
(String topicName, int partition) boolean
void
void
-
Constructor Details
-
IsolatedIngestionServer
- Throws:
FileNotFoundException
-
-
Method Details
-
startInner
public boolean startInner()- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
Exception
-
isInitiated
public boolean isInitiated() -
getStorageService
-
getConfigLoader
-
getStoreIngestionService
-
getStorageMetadataService
-
getStoreRepository
-
getMetricsRepository
public io.tehuti.metrics.MetricsRepository getMetricsRepository() -
getIngestionBackend
-
getPartitionStateSerializer
-
getStopConsumptionTimeoutInSeconds
public int getStopConsumptionTimeoutInSeconds() -
getMetricsMap
-
updateHeartbeatTime
public void updateHeartbeatTime() -
cleanupTopicPartitionState
-
cleanupTopicState
-
reportIngestionStatus
Use executor to execute status reporting in async fashion, otherwise it may cause deadlock between main process and child process. One previous example of the deadlock situation could happen when VersionBackend is trying to unsubscribe a topic partition, it will hold VersionBackend instance lock, and send a blocking call to isolated ingestion service to callKafkaStoreIngestionService.stopConsumptionAndWait(VeniceStoreVersionConfig, int, int, int, boolean)
, inside which it will wait up to 30 seconds to drain internal messages for the partition. For SOP message, it will callVeniceNotifier.started(String, int)
and in ingestion isolation case it will send a blocking call to main process to report progress. The logic inside Da Vinci Client ingestion notifier's started() will call tryStartHeartbeat() in VersionBackend which will also need the VersionBackend instance lock. Thus all of them get stuck until timeout, which leads to unexpected behavior of draining to closed RocksDB storage. This status reporting executor is designed to be single thread to respect the reporting order inside child process. For time-consuming action like stopConsumptionAndWait, we introduced an extra multi-thread executors to improve the performance. -
getRedundantExceptionFilter
-
setResourceToBeUnsubscribed
-
setResourceToBeSubscribed
-
isResourceSubscribed
-
maybeInitializeResourceHostingMetadata
-
main
- Throws:
Exception
-