Class IsolatedIngestionServer

  • All Implemented Interfaces:, java.lang.AutoCloseable

    public class IsolatedIngestionServer
    extends AbstractVeniceService
    This class is the server service of the isolated ingestion. It is a Netty based server that listens to all the requests sent from IsolatedIngestionBackend in main process and spawns IsolatedIngestionServerHandler to handle the request. The general workflow goes as follows: (1) When server instance is created in child process, it will remain idle until it receives initialization request from main process, which pass in all the configs needed to initialize all ingestion components. (2) Once initialization completes, it starts listening to ingestion command sent from main process. (3) When ingestion notifier in child process is notified, it will use its MainIngestionRequestClient to relay status back to MainIngestionMonitorService in main process. MainIngestionMonitorService will further dispatch status reporting to all registered notifiers in main process. -- For COMPLETED status, it will stop ingestion and shutdown corresponding storage so main process can re-subscribe it for serving purpose. -- For ERROR status, it will also stop ingestion and shutdown storage, and it will also forward the ERROR status for main process to handle. The server will not persist any ingestion status to the disk. When the child process crashes, MainIngestionMonitorService will be responsible for respawning a new instance and resuming all ongoing ingestion tasks for fault tolerance purpose.
    • Constructor Detail

      • IsolatedIngestionServer

        public IsolatedIngestionServer​(java.lang.String configPath)
    • Method Detail

      • isInitiated

        public boolean isInitiated()
      • getMetricsRepository

        public io.tehuti.metrics.MetricsRepository getMetricsRepository()
      • getStopConsumptionTimeoutInSeconds

        public int getStopConsumptionTimeoutInSeconds()
      • getMetricsMap

        public java.util.Map<java.lang.String,​java.lang.Double> getMetricsMap()
      • updateHeartbeatTime

        public void updateHeartbeatTime()
      • cleanupTopicPartitionState

        public void cleanupTopicPartitionState​(java.lang.String topicName,
                                               int partitionId)
      • cleanupTopicState

        public void cleanupTopicState​(java.lang.String topicName)
      • reportIngestionStatus

        public void reportIngestionStatus​(IngestionTaskReport report)
        Use executor to execute status reporting in async fashion, otherwise it may cause deadlock between main process and child process. One previous example of the deadlock situation could happen when VersionBackend is trying to unsubscribe a topic partition, it will hold VersionBackend instance lock, and send a blocking call to isolated ingestion service to call KafkaStoreIngestionService.stopConsumptionAndWait(VeniceStoreVersionConfig, int, int, int, boolean), inside which it will wait up to 30 seconds to drain internal messages for the partition. For SOP message, it will call VeniceNotifier.started(String, int) and in ingestion isolation case it will send a blocking call to main process to report progress. The logic inside Da Vinci Client ingestion notifier's started() will call tryStartHeartbeat() in VersionBackend which will also need the VersionBackend instance lock. Thus all of them get stuck until timeout, which leads to unexpected behavior of draining to closed RocksDB storage. This status reporting executor is designed to be single thread to respect the reporting order inside child process. For time-consuming action like stopConsumptionAndWait, we introduced an extra multi-thread executors to improve the performance.
      • setResourceToBeUnsubscribed

        public void setResourceToBeUnsubscribed​(java.lang.String topicName,
                                                int partition)
      • setResourceToBeSubscribed

        public void setResourceToBeSubscribed​(java.lang.String topicName,
                                              int partition)
      • isResourceSubscribed

        public boolean isResourceSubscribed​(java.lang.String topicName,
                                            int partition)
      • maybeInitializeResourceHostingMetadata

        public void maybeInitializeResourceHostingMetadata​(java.lang.String topicName,
                                                           int partition)
      • main

        public static void main​(java.lang.String[] args)
                         throws java.lang.Exception