Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions hadoop-hdds/framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-managed-rocksdb</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.ozone</groupId>
<artifactId>hdds-test-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
*/
public final class CodecBufferCodec implements Codec<CodecBuffer> {

private static final Codec<CodecBuffer> DIRECT_INSTANCE = new CodecBufferCodec(true);
private static final Codec<CodecBuffer> NON_DIRECT_INSTANCE = new CodecBufferCodec(false);
private static final CodecBufferCodec DIRECT_INSTANCE = new CodecBufferCodec(true);
private static final CodecBufferCodec NON_DIRECT_INSTANCE = new CodecBufferCodec(false);

private final CodecBuffer.Allocator allocator;

public static Codec<CodecBuffer> get(boolean direct) {
public static CodecBufferCodec get(boolean direct) {
return direct ? DIRECT_INSTANCE : NON_DIRECT_INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation {
static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class);

private static final AtomicInteger BATCH_COUNT = new AtomicInteger();
private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true);

private final String name = "Batch-" + BATCH_COUNT.getAndIncrement();

Expand Down Expand Up @@ -166,21 +167,30 @@ public void close() {
* Delete operation to be applied to a {@link ColumnFamily} batch.
*/
private static final class DeleteOp extends Op {
private final byte[] key;
private final CodecBuffer key;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeleteOp.key is the same as Op.keyBytes. We should move DeleteOp.key to Op, instead of adding it for all the subclasses. (Since there were byte[] and CodecBuffer, I did not mention this previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't hold true for delete range. Key would be null in that case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For for delete range, just use key as startKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

private final AtomicBoolean closed = new AtomicBoolean(false);
Copy link
Contributor

@szetszwo szetszwo Dec 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to key, we should move closed to Op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont need this closed for closing rocksObject since that is already idempotent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we need to for CodecBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeleteRange doesn't have a ByteBuffer API yet. We would still have to depend on the byte array. That is why I don't like the idea of bringing in the codecBuffer Key into Op class
I have raised one
facebook/rocksdb#14197

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, how could you "Completely get rid of byte array operations" ? What does it mean?


private DeleteOp(byte[] key, Bytes keyBytes) {
private DeleteOp(CodecBuffer key, Bytes keyBytes) {
super(Objects.requireNonNull(keyBytes, "keyBytes == null"));
this.key = Objects.requireNonNull(key, "key == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchDelete(batch, this.key);
family.batchDelete(batch, this.key.asReadOnlyByteBuffer());
}

@Override
public int keyLen() {
return key.length;
return key.readableBytes();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
key.release();
}
super.close();
}
}

Expand Down Expand Up @@ -223,35 +233,6 @@ public void close() {
}
}

/**
* Put operation to be applied to a {@link ColumnFamily} batch using the byte array api.
*/
private static final class ByteArrayPutOp extends Op {
private final byte[] key;
private final byte[] value;

private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) {
super(keyBytes);
this.key = Objects.requireNonNull(key, "key == null");
this.value = Objects.requireNonNull(value, "value == null");
}

@Override
public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException {
family.batchPut(batch, key, value);
}

@Override
public int keyLen() {
return key.length;
}

@Override
public int valLen() {
return value.length;
}
}

/** Cache and deduplicate db ops (put/delete). */
private class OpCache {
/** A (family name -> {@link FamilyCache}) map. */
Expand Down Expand Up @@ -343,13 +324,22 @@ void put(CodecBuffer key, CodecBuffer value) {

void put(byte[] key, byte[] value) {
putCount++;
Bytes keyBytes = new Bytes(key);
overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes));
CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
CodecBuffer valueBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value);
Bytes keyBytes = Bytes.newBytes(keyBuffer);
overwriteIfExists(keyBytes, new PutOp(keyBuffer, valueBuffer, keyBytes));
}

void delete(byte[] key) {
delCount++;
Bytes keyBytes = new Bytes(key);
CodecBuffer keyBuffer = DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key);
Bytes keyBytes = Bytes.newBytes(keyBuffer);
overwriteIfExists(keyBytes, new DeleteOp(keyBuffer, keyBytes));
}

void delete(CodecBuffer key) {
delCount++;
Bytes keyBytes = Bytes.newBytes(key);
overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes));
}

Expand Down Expand Up @@ -388,6 +378,10 @@ void delete(ColumnFamily family, byte[] key) {
.delete(key);
}

void delete(ColumnFamily family, CodecBuffer key) {
name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key);
}

/** Prepare batch write for the entire cache. */
UncheckedAutoCloseable prepareBatchWrite() throws RocksDatabaseException {
for (Map.Entry<String, FamilyCache> e : name2cache.entrySet()) {
Expand Down Expand Up @@ -464,6 +458,10 @@ public void delete(ColumnFamily family, byte[] key) {
opCache.delete(family, key);
}

public void delete(ColumnFamily family, CodecBuffer key) {
opCache.delete(family, key);
}

public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) {
opCache.put(family, key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,14 @@ public void deleteRange(byte[] beginKey, byte[] endKey) throws RocksDatabaseExce
db.deleteRange(family, beginKey, endKey);
}

void deleteWithBatch(BatchOperation batch, CodecBuffer key) {
if (batch instanceof RDBBatchOperation) {
((RDBBatchOperation) batch).delete(family, key);
} else {
throw new IllegalArgumentException("Unexpected batch class: " + batch.getClass().getSimpleName());
}
}

@Override
public void deleteWithBatch(BatchOperation batch, byte[] key) {
if (batch instanceof RDBBatchOperation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public ColumnFamilyHandle getHandle() {
return handle;
}

public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
public void batchDelete(ManagedWriteBatch writeBatch, ByteBuffer key)
throws RocksDatabaseException {
try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.delete(getHandle(), key);
Expand All @@ -308,20 +308,6 @@ public void batchDelete(ManagedWriteBatch writeBatch, byte[] key)
}
}

public void batchPut(ManagedWriteBatch writeBatch, byte[] key, byte[] value)
throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
LOG.debug("batchPut array key {}", bytes2String(key));
LOG.debug("batchPut array value {}", bytes2String(value));
}

try (UncheckedAutoCloseable ignored = acquire()) {
writeBatch.put(getHandle(), key, value);
} catch (RocksDBException e) {
throw toRocksDatabaseException(this, "batchPut key " + bytes2String(key), e);
}
}

public void batchPut(ManagedWriteBatch writeBatch, ByteBuffer key,
ByteBuffer value) throws RocksDatabaseException {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,19 @@ public void delete(KEY key) throws RocksDatabaseException, CodecException {

@Override
public void deleteWithBatch(BatchOperation batch, KEY key) throws CodecException {
rawTable.deleteWithBatch(batch, encodeKey(key));
if (supportCodecBuffer) {
CodecBuffer keyBuffer = null;
try {
keyBuffer = keyCodec.toDirectCodecBuffer(key);
// The buffers will be released after commit.
rawTable.deleteWithBatch(batch, keyBuffer);
} catch (Exception e) {
IOUtils.closeQuietly(keyBuffer);
throw e;
}
} else {
rawTable.deleteWithBatch(batch, encodeKey(key));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.utils.db;

import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.OpType;
import org.apache.hadoop.hdds.utils.db.managed.TrackingUtilManagedWriteBatch.Operation;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;

/**
* Test class for verifying batch operations with delete ranges using the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have "delete ranges" here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the javadoc

* RDBBatchOperation and MockedConstruction of ManagedWriteBatch.
*
* This test class includes:
* - Mocking and tracking of operations including put, delete, and delete range
* within a batch operation.
* - Validation of committed operations using assertions on collected data.
* - Ensures that the batch operation interacts correctly with the
* RocksDatabase and ColumnFamilyHandle components.
*
* The test method includes:
* 1. Setup of mocked ColumnFamilyHandle and RocksDatabase.ColumnFamily.
* 2. Mocking of methods to track operations performed on*/
public class TestRDBBatchOperation {

static {
ManagedRocksObjectUtils.loadRocksDBLibrary();
}

private static Operation getOperation(String key, String value, OpType opType) {
return new Operation(string2Bytes(key), value == null ? null : string2Bytes(value), opType);
}

@Test
public void testBatchOperation() throws RocksDatabaseException, CodecException, RocksDBException {
try (TrackingUtilManagedWriteBatch writeBatch = new TrackingUtilManagedWriteBatch();
RDBBatchOperation batchOperation = RDBBatchOperation.newAtomicOperation(writeBatch)) {
ColumnFamilyHandle columnFamilyHandle = Mockito.mock(ColumnFamilyHandle.class);
RocksDatabase.ColumnFamily columnFamily = Mockito.mock(RocksDatabase.ColumnFamily.class);
doAnswer((i) -> {
((ManagedWriteBatch)i.getArgument(0))
.put(columnFamilyHandle, (ByteBuffer) i.getArgument(1), (ByteBuffer) i.getArgument(2));
return null;
}).when(columnFamily).batchPut(any(ManagedWriteBatch.class), any(ByteBuffer.class), any(ByteBuffer.class));

doAnswer((i) -> {
((ManagedWriteBatch)i.getArgument(0))
.delete(columnFamilyHandle, (ByteBuffer) i.getArgument(1));
return null;
}).when(columnFamily).batchDelete(any(ManagedWriteBatch.class), any(ByteBuffer.class));

when(columnFamily.getHandle()).thenReturn(columnFamilyHandle);
when(columnFamilyHandle.getName()).thenReturn(string2Bytes("test"));
when(columnFamily.getName()).thenReturn("test");
Codec<String> codec = StringCodec.get();
// OP1: This should be skipped in favor of OP9.
batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value01"));
// OP2
batchOperation.put(columnFamily, codec.toPersistedFormat("key02"), codec.toPersistedFormat("value02"));
// OP3: This should be skipped in favor of OP4.
batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key03"), codec.toDirectCodecBuffer("value03"));
// OP4
batchOperation.put(columnFamily, codec.toPersistedFormat("key03"), codec.toPersistedFormat("value04"));
// OP5
batchOperation.delete(columnFamily, codec.toDirectCodecBuffer("key05"));
// OP6
batchOperation.delete(columnFamily, codec.toPersistedFormat("key10"));
// OP7
batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key04"), codec.toDirectCodecBuffer("value04"));
// OP8
batchOperation.put(columnFamily, codec.toPersistedFormat("key06"), codec.toPersistedFormat("value05"));
//OP9
batchOperation.put(columnFamily, codec.toDirectCodecBuffer("key01"), codec.toDirectCodecBuffer("value011"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This hard coded test is too simple. How about implementing this idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to add it as part of the delete range PR but yeah I can add it here as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this hardcoded test? How about removing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a good concrete test to have to atleast test some concrete scenario. Randomized test would not guarantee everything ran



RocksDatabase db = Mockito.mock(RocksDatabase.class);
doNothing().when(db).batchWrite(any());
batchOperation.commit(db);
Set<Operation> expectedOps = ImmutableSet.of(
getOperation("key01", "value011", OpType.PUT_DIRECT),
getOperation("key02", "value02", OpType.PUT_DIRECT),
getOperation("key03", "value04", OpType.PUT_DIRECT),
getOperation("key05", null, OpType.DELETE_DIRECT),
getOperation("key10", null, OpType.DELETE_DIRECT),
getOperation("key04", "value04", OpType.PUT_DIRECT),
getOperation("key06", "value05", OpType.PUT_DIRECT));
assertEquals(Collections.singleton("test"), writeBatch.getOperations().keySet());
assertEquals(expectedOps, new HashSet<>(writeBatch.getOperations().get("test")));
}
}
}
22 changes: 17 additions & 5 deletions hadoop-hdds/managed-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@
<name>Apache Ozone HDDS Managed RocksDB</name>
<description>Apache Ozone Managed RocksDB library</description>

<properties>
<!-- no tests in this module so far -->
<maven.test.skip>true</maven.test.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
Expand Down Expand Up @@ -63,6 +58,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -74,6 +74,18 @@
<proc>none</proc>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading