Class VeniceViewWriter

java.lang.Object
com.linkedin.venice.views.VeniceView
com.linkedin.davinci.store.view.VeniceViewWriter
Direct Known Subclasses:
ChangeCaptureViewWriter, MaterializedViewWriter

public abstract class VeniceViewWriter extends VeniceView
This class is kept separate from the VeniceView class to not leak certain concepts that currently exclusively reside in the server to other components. This class is created per LeaderFollowerStoreIngestionTask per view and is invoked to processRecords in the ActiveActiveStoreIngestionTask as it's consumed from Kafka and after MergeConflict resolution has run. It's then invoked prior to committing to local persistent storage or local VT production in order to keep integrity of the data in various failure scenarios. This implies that repeat calls on the same data for processRecords can happen and there should be some protection from that being an issue in different view implementations.
  • Field Details

    • version

      protected final Version version
    • versionNumber

      protected final int versionNumber
    • isNearlineProducerCompressionEnabled

      protected Optional<Boolean> isNearlineProducerCompressionEnabled
    • nearlineProducerCountPerWriter

      protected Optional<Integer> nearlineProducerCountPerWriter
  • Constructor Details

  • Method Details

    • processRecord

      public abstract CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, ByteBuffer oldValue, byte[] key, int newValueSchemaId, int oldValueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord)
      To be called as a given ingestion task consumes each record. This is called prior to writing to a VT or to persistent storage.
      Parameters:
      newValue - the incoming fully specified value which hasn't yet been committed to Venice
      oldValue - the previous value which has already been locally committed to Venice for the given key
      key - the key of the record that designates newValue and oldValue
      version - the version of the store taking this record
      newValueSchemaId - the schemaId of the incoming record
      oldValueSchemaId - the schemaId of the old record
      replicationMetadataRecord - the associated RMD for the incoming record.
    • processRecord

      public abstract CompletableFuture<PubSubProduceResult> processRecord(ByteBuffer newValue, byte[] key, int newValueSchemaId)
      To be called as a given ingestion task consumes each record. This is called prior to writing to a VT or to persistent storage.
      Parameters:
      newValue - the incoming fully specified value which hasn't yet been committed to Venice
      key - the key of the record that designates newValue and oldValue
      version - the version of the store taking this record
      newValueSchemaId - the schemaId of the incoming record
    • processControlMessage

      public void processControlMessage(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
      Called when the server encounters a control message. There isn't (today) a strict ordering on if the rest of the server alters it's state completely or not based on the incoming control message relative to the given view. Different view types may be interested in different control messages and act differently. The corresponding view writer should implement this method accordingly.
      Parameters:
      kafkaKey - the corresponding kafka key of this control message
      kafkaMessageEnvelope - the corresponding kafka message envelope of this control message
      controlMessage - the control message we're processing
      partition - the partition this control message was delivered to
      partitionConsumptionState - the pcs of the consuming node
    • setProducerOptimizations

      protected VeniceWriterOptions.Builder setProducerOptimizations(VeniceWriterOptions.Builder configBuilder)
      A store could have many views and to reduce the impact to write throughput we want to check and enable producer optimizations that can be configured at the store level. To change the producer optimization configs the ingestion task needs to be re-initialized. Meaning either a new version push or server restart after the store level config change and this is by design.
      Parameters:
      configBuilder - to be configured with the producer optimizations
      Returns: