Class IsolatedIngestionUtils
- java.lang.Object
-
- com.linkedin.davinci.ingestion.utils.IsolatedIngestionUtils
-
public class IsolatedIngestionUtils extends java.lang.Object
This class contains methods used for communication between ingestion client and server.
-
-
Field Summary
Fields Modifier and Type Field Description static java.lang.String
FORKED_PROCESS_METADATA_FILENAME
static java.lang.String
INGESTION_ISOLATION_CONFIG_PREFIX
static java.lang.String
ISOLATED_INGESTION_CONFIG_FILENAME
static java.lang.String
ISOLATED_INGESTION_KAFKA_CLUSTER_MAP_FILENAME
static java.lang.String
PID
-
Constructor Summary
Constructors Constructor Description IsolatedIngestionUtils()
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static java.lang.String
buildAndSaveConfigsForForkedIngestionProcess(VeniceConfigLoader configLoader)
static io.netty.handler.codec.http.HttpResponse
buildHttpResponse(io.netty.handler.codec.http.HttpResponseStatus status, byte[] content)
static io.netty.handler.codec.http.HttpResponse
buildHttpResponse(io.netty.handler.codec.http.HttpResponseStatus status, io.netty.buffer.ByteBuf contentBuf)
static io.netty.handler.codec.http.HttpResponse
buildHttpResponse(io.netty.handler.codec.http.HttpResponseStatus status, java.lang.String msg)
static IngestionTaskReport
createIngestionTaskReport()
static IngestionTaskReport
createIngestionTaskReport(IngestionReportType ingestionReportType, java.lang.String kafkaTopic, int partitionId, long offset, java.lang.String message)
static IngestionTaskReport
createIngestionTaskReport(IngestionReportType ingestionReportType, java.lang.String kafkaTopic, int partitionId, java.lang.String message)
static IngestionTaskReport
createIngestionTaskReport(java.lang.String kafkaTopic, int partitionId)
static <T extends org.apache.avro.specific.SpecificRecordBase>
TdeserializeIngestionActionRequest(IngestionAction action, byte[] content)
static <T extends org.apache.avro.specific.SpecificRecordBase>
TdeserializeIngestionActionResponse(IngestionAction action, byte[] content)
static StoreVersionState
deserializeStoreVersionState(java.lang.String topicName, byte[] content)
static void
destroyIsolatedIngestionProcess(java.lang.Process isolatedIngestionServiceProcess)
static void
destroyIsolatedIngestionProcessByPid(long pid)
Kill isolated ingestion process by provided PID.static void
destroyLingeringIsolatedIngestionProcess(VeniceConfigLoader configLoader)
This method takes two steps to kill any lingering forked ingestion process previously created by the same user.static java.lang.String
executeShellCommand(java.lang.String command)
static java.util.Optional<IsolatedIngestionServerAclHandler>
getAclHandler(VeniceConfigLoader configLoader)
static IngestionTaskCommand
getDummyCommand()
static byte[]
getDummyContent()
static IngestionAction
getIngestionActionFromRequest(io.netty.handler.codec.http.HttpRequest req)
static java.util.Optional<java.lang.Integer>
getLingeringIngestionProcessId(int port)
This method returns lingering forked ingestion process PID if it exists.static java.util.Optional<SSLFactory>
getSSLFactory(VeniceConfigLoader configLoader)
static java.util.Optional<SSLFactory>
getSSLFactoryForIngestion(VeniceConfigLoader configLoader)
Create SSLFactory for D2Client in ClientConfig, which will be used by different ingestion components.static boolean
isolatedIngestionServerAclEnabled(VeniceConfigLoader configLoader)
static boolean
isolatedIngestionServerSslEnabled(VeniceConfigLoader configLoader)
static java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
loadForkedIngestionKafkaClusterMapConfig(java.lang.String configBasePath)
static VeniceProperties
loadVenicePropertiesFromFile(java.lang.String configPath)
static VeniceProperties
loadVenicePropertiesFromFile(java.lang.String basePath, java.lang.String fileName)
static byte[]
readHttpRequestContent(io.netty.handler.codec.http.FullHttpRequest response)
static void
releaseTargetPortBinding(int port)
releaseTargetPortBinding aims to release the target port by killing lingering ingestion process bound to the port, which is created from previous deployment and was not killed due to unexpected failures.static void
saveForkedIngestionKafkaClusterMapConfig(VeniceConfigLoader configLoader)
static void
saveForkedIngestionProcessMetadata(VeniceConfigLoader configLoader, ForkedJavaProcess forkedJavaProcess)
static <T extends org.apache.avro.specific.SpecificRecordBase>
byte[]serializeIngestionActionRequest(IngestionAction action, T param)
static <T extends org.apache.avro.specific.SpecificRecordBase>
byte[]serializeIngestionActionResponse(IngestionAction action, T param)
static byte[]
serializeStoreVersionState(java.lang.String topicName, StoreVersionState storeVersionState)
static boolean
sslEnabled(VeniceConfigLoader configLoader)
static void
waitPortBinding(int port, int maxAttempt)
waitPortBinding is used to test server port binding in other process.
-
-
-
Field Detail
-
INGESTION_ISOLATION_CONFIG_PREFIX
public static final java.lang.String INGESTION_ISOLATION_CONFIG_PREFIX
- See Also:
- Constant Field Values
-
ISOLATED_INGESTION_CONFIG_FILENAME
public static final java.lang.String ISOLATED_INGESTION_CONFIG_FILENAME
- See Also:
- Constant Field Values
-
ISOLATED_INGESTION_KAFKA_CLUSTER_MAP_FILENAME
public static final java.lang.String ISOLATED_INGESTION_KAFKA_CLUSTER_MAP_FILENAME
- See Also:
- Constant Field Values
-
FORKED_PROCESS_METADATA_FILENAME
public static final java.lang.String FORKED_PROCESS_METADATA_FILENAME
- See Also:
- Constant Field Values
-
PID
public static final java.lang.String PID
- See Also:
- Constant Field Values
-
-
Method Detail
-
serializeIngestionActionRequest
public static <T extends org.apache.avro.specific.SpecificRecordBase> byte[] serializeIngestionActionRequest(IngestionAction action, T param)
-
deserializeIngestionActionRequest
public static <T extends org.apache.avro.specific.SpecificRecordBase> T deserializeIngestionActionRequest(IngestionAction action, byte[] content)
-
serializeIngestionActionResponse
public static <T extends org.apache.avro.specific.SpecificRecordBase> byte[] serializeIngestionActionResponse(IngestionAction action, T param)
-
deserializeIngestionActionResponse
public static <T extends org.apache.avro.specific.SpecificRecordBase> T deserializeIngestionActionResponse(IngestionAction action, byte[] content)
-
getDummyCommand
public static IngestionTaskCommand getDummyCommand()
-
getDummyContent
public static byte[] getDummyContent()
-
serializeStoreVersionState
public static byte[] serializeStoreVersionState(java.lang.String topicName, StoreVersionState storeVersionState)
-
deserializeStoreVersionState
public static StoreVersionState deserializeStoreVersionState(java.lang.String topicName, byte[] content)
-
buildHttpResponse
public static io.netty.handler.codec.http.HttpResponse buildHttpResponse(io.netty.handler.codec.http.HttpResponseStatus status, java.lang.String msg)
-
buildHttpResponse
public static io.netty.handler.codec.http.HttpResponse buildHttpResponse(io.netty.handler.codec.http.HttpResponseStatus status, byte[] content)
-
buildHttpResponse
public static io.netty.handler.codec.http.HttpResponse buildHttpResponse(io.netty.handler.codec.http.HttpResponseStatus status, io.netty.buffer.ByteBuf contentBuf)
-
readHttpRequestContent
public static byte[] readHttpRequestContent(io.netty.handler.codec.http.FullHttpRequest response)
-
getIngestionActionFromRequest
public static IngestionAction getIngestionActionFromRequest(io.netty.handler.codec.http.HttpRequest req)
-
waitPortBinding
public static void waitPortBinding(int port, int maxAttempt) throws java.lang.Exception
waitPortBinding is used to test server port binding in other process. Since we cannot control the connection setup on other process, we can only test by trying to establish a connection to the target port.- Parameters:
port
- Target port to test connection.maxAttempt
- Max number of connection retries before it announces fail to connect.- Throws:
java.lang.Exception
-
releaseTargetPortBinding
public static void releaseTargetPortBinding(int port)
releaseTargetPortBinding aims to release the target port by killing lingering ingestion process bound to the port, which is created from previous deployment and was not killed due to unexpected failures.
-
getLingeringIngestionProcessId
public static java.util.Optional<java.lang.Integer> getLingeringIngestionProcessId(int port)
This method returns lingering forked ingestion process PID if it exists. Since the lsof command will return all processes associated with the port number(including main process), we will need to iterate all PIDs and filter out ingestion process by name.
-
executeShellCommand
public static java.lang.String executeShellCommand(java.lang.String command)
-
createIngestionTaskReport
public static IngestionTaskReport createIngestionTaskReport(IngestionReportType ingestionReportType, java.lang.String kafkaTopic, int partitionId, long offset, java.lang.String message)
-
createIngestionTaskReport
public static IngestionTaskReport createIngestionTaskReport(java.lang.String kafkaTopic, int partitionId)
-
destroyIsolatedIngestionProcess
public static void destroyIsolatedIngestionProcess(java.lang.Process isolatedIngestionServiceProcess)
-
destroyIsolatedIngestionProcessByPid
public static void destroyIsolatedIngestionProcessByPid(long pid)
Kill isolated ingestion process by provided PID.
-
destroyLingeringIsolatedIngestionProcess
public static void destroyLingeringIsolatedIngestionProcess(VeniceConfigLoader configLoader)
This method takes two steps to kill any lingering forked ingestion process previously created by the same user. It first tries to locate the PID of the lingering forked ingestion process stored in the metadata file. It then uses lsof command to locate isolated ingestion process binding to the same port and kill it.
-
createIngestionTaskReport
public static IngestionTaskReport createIngestionTaskReport()
-
createIngestionTaskReport
public static IngestionTaskReport createIngestionTaskReport(IngestionReportType ingestionReportType, java.lang.String kafkaTopic, int partitionId, java.lang.String message)
-
buildAndSaveConfigsForForkedIngestionProcess
public static java.lang.String buildAndSaveConfigsForForkedIngestionProcess(VeniceConfigLoader configLoader)
-
saveForkedIngestionKafkaClusterMapConfig
public static void saveForkedIngestionKafkaClusterMapConfig(VeniceConfigLoader configLoader)
-
loadForkedIngestionKafkaClusterMapConfig
public static java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>> loadForkedIngestionKafkaClusterMapConfig(java.lang.String configBasePath)
-
saveForkedIngestionProcessMetadata
public static void saveForkedIngestionProcessMetadata(VeniceConfigLoader configLoader, ForkedJavaProcess forkedJavaProcess)
-
loadVenicePropertiesFromFile
public static VeniceProperties loadVenicePropertiesFromFile(java.lang.String configPath) throws java.io.FileNotFoundException
- Throws:
java.io.FileNotFoundException
-
loadVenicePropertiesFromFile
public static VeniceProperties loadVenicePropertiesFromFile(java.lang.String basePath, java.lang.String fileName) throws java.io.FileNotFoundException
- Throws:
java.io.FileNotFoundException
-
getSSLFactory
public static java.util.Optional<SSLFactory> getSSLFactory(VeniceConfigLoader configLoader)
-
getSSLFactoryForIngestion
public static java.util.Optional<SSLFactory> getSSLFactoryForIngestion(VeniceConfigLoader configLoader)
Create SSLFactory for D2Client in ClientConfig, which will be used by different ingestion components.
-
getAclHandler
public static java.util.Optional<IsolatedIngestionServerAclHandler> getAclHandler(VeniceConfigLoader configLoader)
-
isolatedIngestionServerSslEnabled
public static boolean isolatedIngestionServerSslEnabled(VeniceConfigLoader configLoader)
-
isolatedIngestionServerAclEnabled
public static boolean isolatedIngestionServerAclEnabled(VeniceConfigLoader configLoader)
-
sslEnabled
public static boolean sslEnabled(VeniceConfigLoader configLoader)
-
-