Package com.linkedin.davinci.store
Class AbstractStorageEngine<Partition extends AbstractStoragePartition>
java.lang.Object
com.linkedin.davinci.store.AbstractStorageEngine<Partition>
- All Implemented Interfaces:
StorageEngine<Partition>,Closeable,AutoCloseable
- Direct Known Subclasses:
BlackHoleStorageEngine,InMemoryStorageEngine,RocksDBStorageEngine,VeniceStoreCacheStorageEngine
public abstract class AbstractStorageEngine<Partition extends AbstractStoragePartition>
extends Object
implements StorageEngine<Partition>
A base storage abstract class which is actually responsible for data persistence. This
abstract class implies all the usual responsibilities of a Store implementation,
There are several proposals for storage-partition model:
1. One storage engine for all stores
1.1 One store uses one database, i.e. all partitions of the store will be in one database.
1.2 One store uses multiple databases, i.e. one partition per database.
2. Each store handled by one storage engine
2.1 All partitions of the store will be handled in one database (current Voldemort implementation)
2.2 One partition per database (Sudha suggests)
3. Each partition handled by one storage engine (original proposal before today’s discussion, super high overhead)
The point of having one storage engine(environment) or one database for one partition, is to simplify the complexity of rebalancing/partition migration/host swap.
The team agreed to take (2.2) as default storage-partition model for now, and run performance tests to see if it goes well.
-
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionAbstractStorageEngine(String storeVersionName, InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddStoragePartition(StoragePartitionConfig storagePartitionConfig) voidaddStoragePartitionIfAbsent(int partitionId) voidadjustStoragePartition(int partitionId, StoragePartitionAdjustmentTrigger mode, StoragePartitionConfig partitionConfig) Adjust the opened storage partition according to the provided storagePartitionConfig.voidbeginBatchWrite(StoragePartitionConfig storagePartitionConfig, Map<String, String> checkpointedInfo, Optional<Supplier<byte[]>> checksumSupplier) A lot of storage engines support efficient methods for performing large number of writes (puts/deletes) against the data source.booleancheckDatabaseIntegrity(int partitionId, Map<String, String> checkpointedInfo, StoragePartitionConfig storagePartitionConfig) checks whether the current state of the database is valid during the start of ingestion.voidclearPartitionOffset(int partitionId) Clear the offset associated with the partitionId in the metadata partition.voidClear the store version state in the metadata partition.voidclose()voidvoidclosePartition(int partitionId) booleancontainsPartition(int partitionId) Return true or false based on whether a given partition exists within this storage engineabstract PartitioncreateStoragePartition(StoragePartitionConfig partitionConfig) voiddelete(int partitionId, byte[] key) voiddeleteWithReplicationMetadata(int partitionId, byte[] key, byte[] replicationMetadata) voiddrop()Drop the whole storevoiddropPartition(int partitionId) Removes and returns a partition from the current storevoiddropPartition(int partitionId, boolean dropMetadataPartitionWhenEmpty) Removes and returns a partition from the current storevoidendBatchWrite(StoragePartitionConfig storagePartitionConfig) protected <T> TexecuteWithSafeGuard(int partitionId, Callable<T> callable) byte[]get(int partitionId, byte[] key) get(int partitionId, byte[] key, ByteBuffer valueToBePopulated) byte[]get(int partitionId, ByteBuffer keyBuffer) voidgetByKeyPrefix(int partitionId, byte[] partialKey, BytesStreamingCallback bytesStreamingCallback) getIterator(int partitionId) protected AbstractStoragePartitionlongA function which behaves likeMap.size(), in the sense that it ignores empty (null) slots in the list.Get all Partition Ids which are assigned to the current Node.getPartitionOffset(int partitionId, PubSubContext pubSubContext) Retrieve the offset associated with the partitionId from the metadata partition.getPartitionOrThrow(int partitionId) protected Collection<Partition>byte[]getReplicationMetadata(int partitionId, ByteBuffer key) getRWLockForPartitionOrThrow(int partitionId) Making it public is for testing purpose.Retrieve the store version state from the metadata partition.booleanisClosed()voidput(int partitionId, byte[] key, byte[] value) voidput(int partitionId, byte[] key, ByteBuffer value) voidputPartitionOffset(int partitionId, OffsetRecord offsetRecord) Put the offset associated with the partitionId into the metadata partition.voidputReplicationMetadata(int partitionId, byte[] key, byte[] replicationMetadata) voidputStoreVersionState(StoreVersionState versionState) Put the store version state into the metadata partition.voidputWithReplicationMetadata(int partitionId, byte[] key, ByteBuffer value, byte[] replicationMetadata) voidreopenStoragePartition(int partitionId) Reopen the underlying database.protected final voidprotected voidrestoreStoragePartitions(boolean restoreMetadataPartition, boolean restoreDataPartitions) Load the existing storage partitions.voidsuppressLogs(boolean suppressLogs) sync(int partitionId) toString()voidupdateStoreVersionStateCache(StoreVersionState versionState) Used in ingestion isolation mode update the storage engine's cache in sync with the updates to the state inMainIngestionStorageMetadataServiceMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface com.linkedin.davinci.store.StorageEngine
getPersistedPartitionIds, getStats, getType
-
Field Details
-
METADATA_PARTITION_ID
public static final int METADATA_PARTITION_ID- See Also:
-
-
Constructor Details
-
AbstractStorageEngine
public AbstractStorageEngine(String storeVersionName, InternalAvroSpecificSerializer<StoreVersionState> storeVersionStateSerializer, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer)
-
-
Method Details
-
getRWLockForPartitionOrThrow
Making it public is for testing purpose. -
getStoreVersionName
- Specified by:
getStoreVersionNamein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
toString
-
createStoragePartition
-
restoreStoragePartitions
protected void restoreStoragePartitions(boolean restoreMetadataPartition, boolean restoreDataPartitions) Load the existing storage partitions. The implementation should decide when to call this function properly to restore partitions. -
restoreStoragePartitions
protected final void restoreStoragePartitions() -
getMetadataPartition
-
adjustStoragePartition
public void adjustStoragePartition(int partitionId, StoragePartitionAdjustmentTrigger mode, StoragePartitionConfig partitionConfig) Adjust the opened storage partition according to the provided storagePartitionConfig. It will throw exception if there is no opened storage partition for the given partition id. The reason to have is mainly used to ease the unit test.- Specified by:
adjustStoragePartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
addStoragePartitionIfAbsent
public void addStoragePartitionIfAbsent(int partitionId) - Specified by:
addStoragePartitionIfAbsentin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
addStoragePartition
- Specified by:
addStoragePartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
closePartition
public void closePartition(int partitionId) - Specified by:
closePartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
closeMetadataPartition
public void closeMetadataPartition()- Specified by:
closeMetadataPartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
dropPartition
public void dropPartition(int partitionId) Removes and returns a partition from the current store- Specified by:
dropPartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Parameters:
partitionId- - id of partition to retrieve and remove
-
dropPartition
public void dropPartition(int partitionId, boolean dropMetadataPartitionWhenEmpty) Removes and returns a partition from the current store- Specified by:
dropPartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Parameters:
partitionId- - id of partition to retrieve and removedropMetadataPartitionWhenEmpty- - if true, the whole store will be dropped if ALL partitions are removed
-
drop
public void drop()Drop the whole store- Specified by:
dropin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
sync
- Specified by:
syncin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
close
- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable- Specified by:
closein interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
isClosed
public boolean isClosed()- Specified by:
isClosedin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
checkDatabaseIntegrity
public boolean checkDatabaseIntegrity(int partitionId, Map<String, String> checkpointedInfo, StoragePartitionConfig storagePartitionConfig) checks whether the current state of the database is valid during the start of ingestion.- Specified by:
checkDatabaseIntegrityin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
beginBatchWrite
public void beginBatchWrite(StoragePartitionConfig storagePartitionConfig, Map<String, String> checkpointedInfo, Optional<Supplier<byte[]>> checksumSupplier) A lot of storage engines support efficient methods for performing large number of writes (puts/deletes) against the data source. This method puts the storage engine in this batch write mode- Specified by:
beginBatchWritein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
endBatchWrite
- Specified by:
endBatchWritein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
executeWithSafeGuard
-
reopenStoragePartition
public void reopenStoragePartition(int partitionId) Reopen the underlying database.- Specified by:
reopenStoragePartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
put
- Specified by:
putin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
put
- Specified by:
putin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
putWithReplicationMetadata
public void putWithReplicationMetadata(int partitionId, byte[] key, ByteBuffer value, byte[] replicationMetadata) throws VeniceException - Specified by:
putWithReplicationMetadatain interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
putReplicationMetadata
public void putReplicationMetadata(int partitionId, byte[] key, byte[] replicationMetadata) throws VeniceException - Specified by:
putReplicationMetadatain interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
get
- Specified by:
getin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
get
public ByteBuffer get(int partitionId, byte[] key, ByteBuffer valueToBePopulated) throws VeniceException - Specified by:
getin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
get
- Specified by:
getin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
getByKeyPrefix
public void getByKeyPrefix(int partitionId, byte[] partialKey, BytesStreamingCallback bytesStreamingCallback) - Specified by:
getByKeyPrefixin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
delete
- Specified by:
deletein interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
deleteWithReplicationMetadata
public void deleteWithReplicationMetadata(int partitionId, byte[] key, byte[] replicationMetadata) throws VeniceException - Specified by:
deleteWithReplicationMetadatain interfaceStorageEngine<Partition extends AbstractStoragePartition>- Throws:
VeniceException
-
getReplicationMetadata
- Specified by:
getReplicationMetadatain interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
putPartitionOffset
Put the offset associated with the partitionId into the metadata partition.- Specified by:
putPartitionOffsetin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
getPartitionOffset
Retrieve the offset associated with the partitionId from the metadata partition.- Specified by:
getPartitionOffsetin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
clearPartitionOffset
public void clearPartitionOffset(int partitionId) Clear the offset associated with the partitionId in the metadata partition.- Specified by:
clearPartitionOffsetin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
putStoreVersionState
Put the store version state into the metadata partition.- Specified by:
putStoreVersionStatein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
updateStoreVersionStateCache
Used in ingestion isolation mode update the storage engine's cache in sync with the updates to the state inMainIngestionStorageMetadataService- Specified by:
updateStoreVersionStateCachein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
getStoreVersionState
Retrieve the store version state from the metadata partition.- Specified by:
getStoreVersionStatein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
clearStoreVersionState
public void clearStoreVersionState()Clear the store version state in the metadata partition.- Specified by:
clearStoreVersionStatein interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
containsPartition
public boolean containsPartition(int partitionId) Return true or false based on whether a given partition exists within this storage engine- Specified by:
containsPartitionin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Parameters:
partitionId- The partition to look for- Returns:
- True/False, does the partition exist on this node
-
getNumberOfPartitions
public long getNumberOfPartitions()A function which behaves likeMap.size(), in the sense that it ignores empty (null) slots in the list.- Returns:
- the number of non-null partitions in
partitionList
-
getPartitionIds
Get all Partition Ids which are assigned to the current Node.- Specified by:
getPartitionIdsin interfaceStorageEngine<Partition extends AbstractStoragePartition>- Returns:
- partition Ids that are hosted in the current Storage Engine.
-
getPartitions
-
getPartitionOrThrow
- Specified by:
getPartitionOrThrowin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
suppressLogs
public void suppressLogs(boolean suppressLogs) - Specified by:
suppressLogsin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-
getIterator
- Specified by:
getIteratorin interfaceStorageEngine<Partition extends AbstractStoragePartition>
-