-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-14166. Get rid of byte array operations from RDBBatchOperation for PUT and DELETE #9552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
f47ebe4
61091f2
95cfe2b
d9b1480
aa1841a
48639d2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation { | |
| static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); | ||
|
|
||
| private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); | ||
| private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); | ||
|
|
||
| private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); | ||
|
|
||
|
|
@@ -166,21 +167,30 @@ public void close() { | |
| * Delete operation to be applied to a {@link ColumnFamily} batch. | ||
| */ | ||
| private static final class DeleteOp extends Op { | ||
| private final byte[] key; | ||
| private final CodecBuffer key; | ||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to key, we should move closed to Op.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We dont need this closed for closing rocksObject since that is already idempotent.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But we need to for CodecBuffer.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. DeleteRange doesn't have a ByteBuffer API yet. We would still have to depend on the byte array. That is why I don't like the idea of bringing in the codecBuffer Key into Op class
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then, how could you "Completely get rid of byte array operations" ? What does it mean? |
||
|
|
||
| private DeleteOp(byte[] key, Bytes keyBytes) { | ||
| private DeleteOp(CodecBuffer key, Bytes keyBytes) { | ||
| super(Objects.requireNonNull(keyBytes, "keyBytes == null")); | ||
| this.key = Objects.requireNonNull(key, "key == null"); | ||
| } | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchDelete(batch, this.key); | ||
| family.batchDelete(batch, this.key.asReadOnlyByteBuffer()); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.length; | ||
| return key.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (closed.compareAndSet(false, true)) { | ||
| key.release(); | ||
| } | ||
| super.close(); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -223,35 +233,6 @@ public void close() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. | ||
| */ | ||
| private static final class ByteArrayPutOp extends Op { | ||
| private final byte[] key; | ||
| private final byte[] value; | ||
|
|
||
| private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) { | ||
| super(keyBytes); | ||
| this.key = Objects.requireNonNull(key, "key == null"); | ||
| this.value = Objects.requireNonNull(value, "value == null"); | ||
| } | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchPut(batch, key, value); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.length; | ||
| } | ||
|
|
||
| @Override | ||
| public int valLen() { | ||
| return value.length; | ||
| } | ||
| } | ||
|
|
||
| /** Cache and deduplicate db ops (put/delete). */ | ||
| private class OpCache { | ||
| /** A (family name -> {@link FamilyCache}) map. */ | ||
|
|
@@ -343,13 +324,22 @@ void put(CodecBuffer key, CodecBuffer value) { | |
|
|
||
| void put(byte[] key, byte[] value) { | ||
| putCount++; | ||
| Bytes keyBytes = new Bytes(key); | ||
| overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes)); | ||
| CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); | ||
| CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value); | ||
| Bytes keyBytes = Bytes.newBytes(keyBuffer); | ||
| overwriteIfExists(keyBytes, new PutOp(keyBuffer, valueBuffer, keyBytes)); | ||
| } | ||
|
|
||
| void delete(byte[] key) { | ||
| delCount++; | ||
| Bytes keyBytes = new Bytes(key); | ||
| CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key); | ||
| Bytes keyBytes = Bytes.newBytes(keyBuffer); | ||
| overwriteIfExists(keyBytes, new DeleteOp(keyBuffer, keyBytes)); | ||
| } | ||
|
|
||
| void delete(CodecBuffer key) { | ||
| delCount++; | ||
| Bytes keyBytes = Bytes.newBytes(key); | ||
| overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes)); | ||
| } | ||
|
|
||
|
|
@@ -388,6 +378,10 @@ void delete(ColumnFamily family, byte[] key) { | |
| .delete(key); | ||
| } | ||
|
|
||
| void delete(ColumnFamily family, CodecBuffer key) { | ||
| name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); | ||
| } | ||
|
|
||
| /** Prepare batch write for the entire cache. */ | ||
| UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException { | ||
| for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) { | ||
|
|
@@ -464,6 +458,10 @@ public void delete(ColumnFamily family, byte[] key) { | |
| opCache.delete(family, key); | ||
| } | ||
|
|
||
| public void delete(ColumnFamily family, CodecBuffer key) { | ||
| opCache.delete(family, key); | ||
| } | ||
|
|
||
| public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { | ||
| opCache.put(family, key, value); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.hadoop.hdds.utils.db; | ||
|
|
||
| import static org.apache.hadoop.hdds.StringUtils.string2Bytes; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.Mockito.doAnswer; | ||
| import static org.mockito.Mockito.doNothing; | ||
| import static org.mockito.Mockito.when; | ||
|
|
||
| import com.google.common.collect.ImmutableSet; | ||
| import java.nio.ByteBuffer; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.Set; | ||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils; | ||
| import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; | ||
| import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch; | ||
| import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.OpType; | ||
| import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.Operation; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.mockito.Mockito; | ||
| import org.rocksdb.ColumnFamilyHandle; | ||
| import org.rocksdb.RocksDBException; | ||
|
|
||
| /** | ||
| * Test class for verifying batch operations with delete ranges using the | ||
|
||
| * RDBBatchOperation and MockedConstruction of ManagedWriteBatch. | ||
| * | ||
| * This test class includes: | ||
| * - Mocking and tracking of operations including put, delete, and delete range | ||
| * within a batch operation. | ||
| * - Validation of committed operations using assertions on collected data. | ||
| * - Ensures that the batch operation interacts correctly with the | ||
| * RocksDatabase and ColumnFamilyHandle components. | ||
| * | ||
| * The test method includes: | ||
| * 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily. | ||
| * 2. Mocking of methods to track operations performed on*/ | ||
| public class TestRDBBatchOperation { | ||
|
|
||
| static { | ||
| ManagedRocksObjectUtils.loadRocksDBLibrary(); | ||
| } | ||
|
|
||
| private static Operation getOperation(String key, String value, OpType opType) { | ||
| return new Operation(string2Bytes(key), value == null ? null : string2Bytes(value), opType); | ||
| } | ||
|
|
||
| @Test | ||
| public void testBatchOperation() throws RocksDatabaseException, CodecException, RocksDBException { | ||
| try (TrackingUtilManagedWriteBatch writeBatch = new TrackingUtilManagedWriteBatch(); | ||
| RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) { | ||
| ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class); | ||
| RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class); | ||
| doAnswer((i) -> { | ||
| ((ManagedWriteBatch)i.getArgument(0)) | ||
| .put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2)); | ||
| return null; | ||
| }).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class)); | ||
|
|
||
| doAnswer((i) -> { | ||
| ((ManagedWriteBatch)i.getArgument(0)) | ||
| .delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1)); | ||
| return null; | ||
| }).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class)); | ||
|
|
||
| when(columnFamily.getHandle()).thenReturn(columnFamilyHandle); | ||
| when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test")); | ||
| when(columnFamily.getName()).thenReturn("test"); | ||
| Codec<String> codec = StringCodec.get(); | ||
| // OP1: This should be skipped in favor of OP9. | ||
| batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01")); | ||
| // OP2 | ||
| batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02")); | ||
| // OP3: This should be skipped in favor of OP4. | ||
| batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03")); | ||
| // OP4 | ||
| batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04")); | ||
| // OP5 | ||
| batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05")); | ||
| // OP6 | ||
| batchOperation.delete(columnFamily, codec.toPersistedFormat("key10")); | ||
| // OP7 | ||
| batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04")); | ||
| // OP8 | ||
| batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05")); | ||
| //OP9 | ||
| batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value011")); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This hard coded test is too simple. How about implementing this idea?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was planning to add it as part of the delete range PR but yeah I can add it here as well
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we still need this hardcoded test? How about removing it?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be a good concrete test to have to atleast test some concrete scenario. Randomized test would not guarantee everything ran |
||
|
|
||
|
|
||
| RocksDatabase db = Mockito.mock(RocksDatabase.class); | ||
| doNothing().when(db).batchWrite(any()); | ||
| batchOperation.commit(db); | ||
| Set<Operation> expectedOps = ImmutableSet.of( | ||
| getOperation("key01", "value011", OpType.PUT_DIRECT), | ||
| getOperation("key02", "value02", OpType.PUT_DIRECT), | ||
| getOperation("key03", "value04", OpType.PUT_DIRECT), | ||
| getOperation("key05", null, OpType.DELETE_DIRECT), | ||
| getOperation("key10", null, OpType.DELETE_DIRECT), | ||
| getOperation("key04", "value04", OpType.PUT_DIRECT), | ||
| getOperation("key06", "value05", OpType.PUT_DIRECT)); | ||
| assertEquals(Collections.singleton("test"), writeBatch.getOperations().keySet()); | ||
| assertEquals(expectedOps, new HashSet<>(writeBatch.getOperations().get("test"))); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeleteOp.key is the same as Op.keyBytes. We should move DeleteOp.key to Op, instead of adding it for all the subclasses. (Since there were byte[] and CodecBuffer, I did not mention this previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't hold true for delete range. Key would be null in that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For for delete range, just use key as startKey.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok