Package com.linkedin.davinci.store
Class AbstractStorageEngine<Partition extends AbstractStoragePartition>
java.lang.Object
com.linkedin.davinci.store.AbstractStorageEngine<Partition>
- All Implemented Interfaces:
Closeable
,AutoCloseable
- Direct Known Subclasses:
BlackHoleStorageEngine
,InMemoryStorageEngine
,RocksDBStorageEngine
,VeniceStoreCacheStorageEngine
public abstract class AbstractStorageEngine<Partition extends AbstractStoragePartition>
extends Object
implements Closeable
A base storage abstract class which is actually responsible for data persistence. This
abstract class implies all the usual responsibilities of a Store implementation,
There are several proposals for storage-partition model:
1. One storage engine for all stores
1.1 One store uses one database, i.e. all partitions of the store will be in one database.
1.2 One store uses multiple databases, i.e. one partition per database.
2. Each store handled by one storage engine
2.1 All partitions of the store will be handled in one database (current Voldemort implementation)
2.2 One partition per database (Sudha suggests)
3. Each partition handled by one storage engine (original proposal before today’s discussion, super high overhead)
The point of having one storage engine(environment) or one database for one partition, is to simplify the complexity of rebalancing/partition migration/host swap.
The team agreed to take (2.2) as default storage-partition model for now, and run performance tests to see if it goes well.
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic enum
-
Field Summary
-
Constructor Summary
ConstructorDescriptionAbstractStorageEngine
(String storeVersionName, InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addStoragePartition
(int partitionId) void
addStoragePartition
(StoragePartitionConfig storagePartitionConfig) void
adjustStoragePartition
(int partitionId, AbstractStorageEngine.StoragePartitionAdjustmentTrigger mode, StoragePartitionConfig partitionConfig) Adjust the opened storage partition according to the provided storagePartitionConfig.void
beginBatchWrite
(StoragePartitionConfig storagePartitionConfig, Map<String, String> checkpointedInfo, Optional<Supplier<byte[]>> checksumSupplier) A lot of storage engines support efficient methods for performing large number of writes (puts/deletes) against the data source.boolean
checkDatabaseIntegrity
(int partitionId, Map<String, String> checkpointedInfo, StoragePartitionConfig storagePartitionConfig) checks whether the current state of the database is valid during the start of ingestion.void
clearPartitionOffset
(int partitionId) Clear the offset associated with the partitionId in the metadata partition.void
Clear the store version state in the metadata partition.void
close()
void
void
closePartition
(int partitionId) boolean
containsPartition
(int partitionId) Return true or false based on whether a given partition exists within this storage enginevoid
createSnapshot
(StoragePartitionConfig storagePartitionConfig) Create snapshot for the given partitionabstract Partition
createStoragePartition
(StoragePartitionConfig partitionConfig) void
delete
(int partitionId, byte[] key) void
deleteWithReplicationMetadata
(int partitionId, byte[] key, byte[] replicationMetadata) void
drop()
Drop the whole storevoid
void
dropPartition
(int partitionId) Removes and returns a partition from the current storevoid
endBatchWrite
(StoragePartitionConfig storagePartitionConfig) byte[]
get
(int partitionId, byte[] key) get
(int partitionId, byte[] key, ByteBuffer valueToBePopulated) byte[]
get
(int partitionId, ByteBuffer keyBuffer) void
getByKeyPrefix
(int partitionId, byte[] partialKey, BytesStreamingCallback bytesStreamingCallback) long
long
getIterator
(int partitionId) long
A function which behaves likeMap.size()
, in the sense that it ignores empty (null) slots in the list.Get all Partition Ids which are assigned to the current Node.getPartitionOffset
(int partitionId) Retrieve the offset associated with the partitionId from the metadata partition.getPartitionOrThrow
(int partitionId) long
getPartitionSizeInBytes
(int partitionId) byte[]
getReplicationMetadata
(int partitionId, ByteBuffer key) long
getRWLockForPartitionOrThrow
(int partitionId) Making it public is for testing purpose.abstract long
Retrieve the store version state from the metadata partition.abstract PersistenceType
getType()
boolean
boolean
boolean
isClosed()
static boolean
isMetadataPartition
(int partitionId) void
put
(int partitionId, byte[] key, byte[] value) void
put
(int partitionId, byte[] key, ByteBuffer value) <K,
V> void put
(int partitionId, K key, V value) void
putPartitionOffset
(int partitionId, OffsetRecord offsetRecord) Put the offset associated with the partitionId into the metadata partition.void
putReplicationMetadata
(int partitionId, byte[] key, byte[] replicationMetadata) void
putStoreVersionState
(StoreVersionState versionState) Put the store version state into the metadata partition.void
putWithReplicationMetadata
(int partitionId, byte[] key, ByteBuffer value, byte[] replicationMetadata) void
reopenStoragePartition
(int partitionId) Reopen the underlying database.protected final void
protected void
restoreStoragePartitions
(boolean restoreMetadataPartition, boolean restoreDataPartitions) Load the existing storage partitions.void
suppressLogs
(boolean suppressLogs) sync
(int partitionId) toString()
void
updateStoreVersionStateCache
(StoreVersionState versionState) Used in ingestion isolation mode update the storage engine's cache in sync with the updates to the state inMainIngestionStorageMetadataService
-
Field Details
-
METADATA_PARTITION_ID
public static final int METADATA_PARTITION_ID- See Also:
-
-
Constructor Details
-
AbstractStorageEngine
public AbstractStorageEngine(String storeVersionName, InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer)
-
-
Method Details
-
getRWLockForPartitionOrThrow
Making it public is for testing purpose. -
getStoreVersionName
-
toString
-
getType
-
getStoreSizeInBytes
public abstract long getStoreSizeInBytes() -
getCachedStoreSizeInBytes
public long getCachedStoreSizeInBytes() -
getRMDSizeInBytes
public long getRMDSizeInBytes() -
getCachedRMDSizeInBytes
public long getCachedRMDSizeInBytes() -
getPersistedPartitionIds
-
createStoragePartition
-
restoreStoragePartitions
protected void restoreStoragePartitions(boolean restoreMetadataPartition, boolean restoreDataPartitions) Load the existing storage partitions. The implementation should decide when to call this function properly to restore partitions. -
restoreStoragePartitions
protected final void restoreStoragePartitions() -
getMetadataPartition
-
adjustStoragePartition
public void adjustStoragePartition(int partitionId, AbstractStorageEngine.StoragePartitionAdjustmentTrigger mode, StoragePartitionConfig partitionConfig) Adjust the opened storage partition according to the provided storagePartitionConfig. It will throw exception if there is no opened storage partition for the given partition id. The reason to have is mainly used to ease the unit test. -
addStoragePartition
public void addStoragePartition(int partitionId) -
addStoragePartition
-
closePartition
public void closePartition(int partitionId) -
closeMetadataPartition
public void closeMetadataPartition() -
dropPartition
public void dropPartition(int partitionId) Removes and returns a partition from the current store- Parameters:
partitionId
- - id of partition to retrieve and remove
-
dropMetadataPartition
public void dropMetadataPartition() -
drop
public void drop()Drop the whole store -
sync
-
close
- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Throws:
VeniceException
-
isClosed
public boolean isClosed() -
checkDatabaseIntegrity
public boolean checkDatabaseIntegrity(int partitionId, Map<String, String> checkpointedInfo, StoragePartitionConfig storagePartitionConfig) checks whether the current state of the database is valid during the start of ingestion. -
beginBatchWrite
public void beginBatchWrite(StoragePartitionConfig storagePartitionConfig, Map<String, String> checkpointedInfo, Optional<Supplier<byte[]>> checksumSupplier) A lot of storage engines support efficient methods for performing large number of writes (puts/deletes) against the data source. This method puts the storage engine in this batch write mode -
endBatchWrite
-
createSnapshot
Create snapshot for the given partition- Parameters:
storagePartitionConfig
-
-
reopenStoragePartition
public void reopenStoragePartition(int partitionId) Reopen the underlying database. -
put
- Throws:
VeniceException
-
put
- Throws:
VeniceException
-
putWithReplicationMetadata
public void putWithReplicationMetadata(int partitionId, byte[] key, ByteBuffer value, byte[] replicationMetadata) throws VeniceException - Throws:
VeniceException
-
putReplicationMetadata
public void putReplicationMetadata(int partitionId, byte[] key, byte[] replicationMetadata) throws VeniceException - Throws:
VeniceException
-
put
public <K,V> void put(int partitionId, K key, V value) -
get
- Throws:
VeniceException
-
get
public ByteBuffer get(int partitionId, byte[] key, ByteBuffer valueToBePopulated) throws VeniceException - Throws:
VeniceException
-
get
- Throws:
VeniceException
-
getByKeyPrefix
public void getByKeyPrefix(int partitionId, byte[] partialKey, BytesStreamingCallback bytesStreamingCallback) -
delete
- Throws:
VeniceException
-
deleteWithReplicationMetadata
public void deleteWithReplicationMetadata(int partitionId, byte[] key, byte[] replicationMetadata) throws VeniceException - Throws:
VeniceException
-
getReplicationMetadata
-
putPartitionOffset
Put the offset associated with the partitionId into the metadata partition. -
getPartitionOffset
Retrieve the offset associated with the partitionId from the metadata partition. -
clearPartitionOffset
public void clearPartitionOffset(int partitionId) Clear the offset associated with the partitionId in the metadata partition. -
isMetadataPartition
public static boolean isMetadataPartition(int partitionId) -
putStoreVersionState
Put the store version state into the metadata partition. -
updateStoreVersionStateCache
Used in ingestion isolation mode update the storage engine's cache in sync with the updates to the state inMainIngestionStorageMetadataService
-
getStoreVersionState
Retrieve the store version state from the metadata partition. -
clearStoreVersionState
public void clearStoreVersionState()Clear the store version state in the metadata partition. -
containsPartition
public boolean containsPartition(int partitionId) Return true or false based on whether a given partition exists within this storage engine- Parameters:
partitionId
- The partition to look for- Returns:
- True/False, does the partition exist on this node
-
getNumberOfPartitions
public long getNumberOfPartitions()A function which behaves likeMap.size()
, in the sense that it ignores empty (null) slots in the list.- Returns:
- the number of non-null partitions in
partitionList
-
getPartitionIds
Get all Partition Ids which are assigned to the current Node.- Returns:
- partition Ids that are hosted in the current Storage Engine.
-
getPartitionOrThrow
-
getPartitionSizeInBytes
public long getPartitionSizeInBytes(int partitionId) -
getCompressionStrategy
-
isChunked
public boolean isChunked() -
suppressLogs
public void suppressLogs(boolean suppressLogs) -
hasMemorySpaceLeft
public boolean hasMemorySpaceLeft() -
getIterator
-