Class IsolatedIngestionServer

java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.ingestion.isolated.IsolatedIngestionServer
All Implemented Interfaces:
Closeable, AutoCloseable

public class IsolatedIngestionServer extends AbstractVeniceService
This class is the server service of the isolated ingestion. It is a Netty based server that listens to all the requests sent from IsolatedIngestionBackend in main process and spawns IsolatedIngestionServerHandler to handle the request. The general workflow goes as follows: (1) When server instance is created in child process, it will remain idle until it receives initialization request from main process, which pass in all the configs needed to initialize all ingestion components. (2) Once initialization completes, it starts listening to ingestion command sent from main process. (3) When ingestion notifier in child process is notified, it will use its MainIngestionRequestClient to relay status back to MainIngestionMonitorService in main process. MainIngestionMonitorService will further dispatch status reporting to all registered notifiers in main process. -- For COMPLETED status, it will stop ingestion and shutdown corresponding storage so main process can re-subscribe it for serving purpose. -- For ERROR status, it will also stop ingestion and shutdown storage, and it will also forward the ERROR status for main process to handle. The server will not persist any ingestion status to the disk. When the child process crashes, MainIngestionMonitorService will be responsible for respawning a new instance and resuming all ongoing ingestion tasks for fault tolerance purpose.
  • Constructor Details

  • Method Details

    • startInner

      public boolean startInner()
      Specified by:
      startInner in class AbstractVeniceService
      Returns:
      true if the service is completely started, false if it is still starting asynchronously (in this case, it is the implementer's responsibility to set AbstractVeniceService.serviceState to AbstractVeniceService.ServiceState.STARTED upon completion of the async work).
    • stopInner

      public void stopInner() throws Exception
      Specified by:
      stopInner in class AbstractVeniceService
      Throws:
      Exception
    • isInitiated

      public boolean isInitiated()
    • getStorageService

      public StorageService getStorageService()
    • getConfigLoader

      public VeniceConfigLoader getConfigLoader()
    • getStoreIngestionService

      public KafkaStoreIngestionService getStoreIngestionService()
    • getStorageMetadataService

      public StorageMetadataService getStorageMetadataService()
    • getStoreRepository

      public ReadOnlyStoreRepository getStoreRepository()
    • getMetricsRepository

      public io.tehuti.metrics.MetricsRepository getMetricsRepository()
    • getIngestionBackend

      public DefaultIngestionBackend getIngestionBackend()
    • getPartitionStateSerializer

      public InternalAvroSpecificSerializer<PartitionState> getPartitionStateSerializer()
    • getStopConsumptionTimeoutInSeconds

      public int getStopConsumptionTimeoutInSeconds()
    • getMetricsMap

      public Map<String,Double> getMetricsMap()
    • updateHeartbeatTime

      public void updateHeartbeatTime()
    • cleanupTopicPartitionState

      public void cleanupTopicPartitionState(String topicName, int partitionId)
    • cleanupTopicState

      public void cleanupTopicState(String topicName)
    • reportIngestionStatus

      public void reportIngestionStatus(IngestionTaskReport report)
      Use executor to execute status reporting in async fashion, otherwise it may cause deadlock between main process and child process. One previous example of the deadlock situation could happen when VersionBackend is trying to unsubscribe a topic partition, it will hold VersionBackend instance lock, and send a blocking call to isolated ingestion service to call KafkaStoreIngestionService.stopConsumptionAndWait(VeniceStoreVersionConfig, int, int, int, boolean), inside which it will wait up to 30 seconds to drain internal messages for the partition. For SOP message, it will call VeniceNotifier.started(String, int) and in ingestion isolation case it will send a blocking call to main process to report progress. The logic inside Da Vinci Client ingestion notifier's started() will call tryStartHeartbeat() in VersionBackend which will also need the VersionBackend instance lock. Thus all of them get stuck until timeout, which leads to unexpected behavior of draining to closed RocksDB storage. This status reporting executor is designed to be single thread to respect the reporting order inside child process. For time-consuming action like stopConsumptionAndWait, we introduced an extra multi-thread executors to improve the performance.
    • getRedundantExceptionFilter

      public RedundantExceptionFilter getRedundantExceptionFilter()
    • setResourceToBeUnsubscribed

      public void setResourceToBeUnsubscribed(String topicName, int partition)
    • setResourceToBeSubscribed

      public void setResourceToBeSubscribed(String topicName, int partition)
    • isResourceSubscribed

      public boolean isResourceSubscribed(String topicName, int partition)
    • maybeInitializeResourceHostingMetadata

      public void maybeInitializeResourceHostingMetadata(String topicName, int partition)
    • main

      public static void main(String[] args) throws Exception
      Throws:
      Exception