Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,8 @@ StatusAndMessages queryUpgradeFinalizationProgress(
long getContainerCount(HddsProtos.LifeCycleState state)
throws IOException;

List<ContainerInfo> getListOfContainers(
long startContainerID, int count, HddsProtos.LifeCycleState state)
List<ContainerID> getListOfContainerIDs(
ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException;

DecommissionScmResponseProto decommissionScm(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
Expand Down Expand Up @@ -109,6 +110,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto;
Expand Down Expand Up @@ -1244,10 +1247,30 @@ public void close() {
}

@Override
public List<ContainerInfo> getListOfContainers(
long startContainerID, int count, HddsProtos.LifeCycleState state)
public List<ContainerID> getListOfContainerIDs(
ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException {
return listContainer(startContainerID, count, state).getContainerInfoList();
Preconditions.checkState(startContainerID.getId() >= 0,
"Container ID cannot be negative.");
Preconditions.checkState(count > 0,
"Container count must be greater than 0.");
SCMListContainerIDsRequestProto.Builder builder = SCMListContainerIDsRequestProto
.newBuilder();
builder.setStartContainerID(startContainerID.getProtobuf());
builder.setCount(count);
builder.setTraceID(TracingUtil.exportCurrentSpan());
builder.setState(state);

SCMListContainerIDsRequestProto request = builder.build();

SCMListContainerIDsResponseProto response =
submitRequest(Type.ListContainerIDs,
builder1 -> builder1.setScmListContainerIDsRequest(request))
.getScmListContainerIDsResponse();
return response.getContainerIDsList()
.stream()
.map(ContainerID::getFromProtobuf)
.collect(Collectors.toList());
}

@Override
Expand Down
14 changes: 14 additions & 0 deletions hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message ScmContainerLocationRequest {
optional ContainerBalancerStatusInfoRequestProto containerBalancerStatusInfoRequest = 48;
optional ReconcileContainerRequestProto reconcileContainerRequest = 49;
optional GetDeletedBlocksTxnSummaryRequestProto getDeletedBlocksTxnSummaryRequest = 50;
optional SCMListContainerIDsRequestProto scmListContainerIDsRequest = 51;
}

message ScmContainerLocationResponse {
Expand Down Expand Up @@ -145,6 +146,7 @@ message ScmContainerLocationResponse {
optional ContainerBalancerStatusInfoResponseProto containerBalancerStatusInfoResponse = 48;
optional ReconcileContainerResponseProto reconcileContainerResponse = 49;
optional GetDeletedBlocksTxnSummaryResponseProto getDeletedBlocksTxnSummaryResponse = 50;
optional SCMListContainerIDsResponseProto scmListContainerIDsResponse = 51;

enum Status {
OK = 1;
Expand Down Expand Up @@ -202,6 +204,7 @@ enum Type {
GetContainerBalancerStatusInfo = 44;
ReconcileContainer = 45;
GetDeletedBlocksTransactionSummary = 46;
ListContainerIDs = 47;
}

/**
Expand Down Expand Up @@ -291,6 +294,17 @@ message GetExistContainerWithPipelinesInBatchResponseProto {
repeated ContainerWithPipeline containerWithPipelines = 1;
}

message SCMListContainerIDsRequestProto {
required uint32 count = 1;
optional ContainerID startContainerID = 2;
optional LifeCycleState state = 3;
optional string traceID = 4;
}

message SCMListContainerIDsResponseProto {
repeated ContainerID containerIDs = 1;
}

message SCMListContainerRequestProto {
required uint32 count = 1;
optional uint64 startContainerID = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,33 @@ default List<ContainerInfo> getContainers() {
return getContainers(ContainerID.valueOf(0), Integer.MAX_VALUE);
}

/**
* Returns container IDs under certain conditions.
* Search container IDs from start ID(inclusive),
* The max size of the searching range cannot exceed the
* value of count.
*
* @param startID start containerID, &gt;=0,
* start searching at the head if 0.
* @param count count must be &gt;= 0
* Usually the count will be replaced with a very big
* value instead of being unlimited in case the db is very big.
* @param state container state
*
* @return a list of container IDs.
*/
List<ContainerID> getContainerIDs(ContainerID startID, int count, LifeCycleState state);

/**
* Returns containers under certain conditions.
* Search container IDs from start ID(exclusive),
* Search container IDs from start ID(inclusive),
* The max size of the searching range cannot exceed the
* value of count.
*
* @param startID start containerID, &gt;=0,
* start searching at the head if 0.
* @param count count must be &gt;= 0
* Usually the count will be replace with a very big
* Usually the count will be replaced with a very big
* value instead of being unlimited in case the db is very big.
*
* @return a list of container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ public List<ContainerInfo> getContainers(ReplicationType type) {
return containerStateManager.getContainerInfos(type);
}

@Override
public List<ContainerID> getContainerIDs(final ContainerID startID,
final int count,
final LifeCycleState state) {
scmContainerManagerMetrics.incNumListContainersOps();
return containerStateManager.getContainerIDs(state, startID, count);
}

@Override
public List<ContainerInfo> getContainers(final ContainerID startID,
final int count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public interface ContainerStateManager {
*/
boolean contains(ContainerID containerID);

/**
* Get {@link ContainerID}s for the given state.
*
* @param start the start {@link ContainerID} (inclusive)
* @param count the size limit
* @return a list of {@link ContainerID};
*/
List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID start, int count);

/**
* Get {@link ContainerInfo}s.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,13 @@ private void initialize() throws IOException {
return actions;
}

@Override
public List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID start, int count) {
try (AutoCloseableLock ignored = readLock()) {
return containers.getContainerIDs(state, start, count);
}
}

@Override
public List<ContainerInfo> getContainerInfos(ContainerID start, int count) {
try (AutoCloseableLock ignored = readLock()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,20 @@ public void updateState(ContainerID containerID, LifeCycleState currentState,
currentInfo.setState(newState);
}

/**
*
* @param state the state of the containers
* @param start the start id
* @param count the maximum size of the returned list
* @return a list of sorted {@link ContainerID}s
*/
public List<ContainerID> getContainerIDs(LifeCycleState state, ContainerID start, int count) {
Preconditions.assertTrue(count >= 0, "count < 0");
return lifeCycleStateMap.tailMap(state, start).keySet().stream()
.limit(count)
.collect(Collectors.toList());
}

public List<ContainerInfo> getContainerInfos(ContainerID start, int count) {
return containerMap.getInfos(start, count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMCloseContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerIDsResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SafeModeRuleStatusProto;
Expand Down Expand Up @@ -752,6 +754,12 @@ public ScmContainerLocationResponse processRequest(
.setStatus(Status.OK)
.setReconcileContainerResponse(reconcileContainer(request.getReconcileContainerRequest()))
.build();
case ListContainerIDs:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setScmListContainerIDsResponse(listContainerIDs(request.getScmListContainerIDsRequest()))
.build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
Expand Down Expand Up @@ -1396,4 +1404,29 @@ public ReconcileContainerResponseProto reconcileContainer(ReconcileContainerRequ
return ReconcileContainerResponseProto.getDefaultInstance();
}

public SCMListContainerIDsResponseProto listContainerIDs(
SCMListContainerIDsRequestProto request) throws IOException {
ContainerID startContainerID = ContainerID.valueOf(0);

if (request.hasStartContainerID()) {
startContainerID = ContainerID.valueOf(request.getStartContainerID().getId());
}

HddsProtos.LifeCycleState state = null;
if (request.hasState()) {
state = request.getState();
}

SCMListContainerIDsResponseProto.Builder builder =
SCMListContainerIDsResponseProto.newBuilder();

List<ContainerID> containerIDs = impl.getListOfContainerIDs(
startContainerID, request.getCount(), state);

containerIDs.stream()
.map(ContainerID::getProtobuf)
.forEach(builder::addContainerIDs);

return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1507,23 +1507,23 @@ public long getContainerCount(HddsProtos.LifeCycleState state)
}

@Override
public List<ContainerInfo> getListOfContainers(
long startContainerID, int count, HddsProtos.LifeCycleState state)
public List<ContainerID> getListOfContainerIDs(
ContainerID startContainerID, int count, HddsProtos.LifeCycleState state)
throws IOException {

final Map<String, String> auditMap = Maps.newHashMap();
auditMap.put("startContainerID", String.valueOf(startContainerID));
auditMap.put("count", String.valueOf(count));
auditMap.put("state", String.valueOf(state));
try {
List<ContainerInfo> results = scm.getContainerManager().getContainers(
ContainerID.valueOf(startContainerID), count, state);
List<ContainerID> results = scm.getContainerManager().getContainerIDs(
startContainerID, count, state);
AUDIT.logReadSuccess(buildAuditMessageForSuccess(
SCMAction.LIST_CONTAINER, auditMap));
SCMAction.LIST_CONTAINER_IDS, auditMap));
return results;
} catch (Exception ex) {
AUDIT.logReadFailure(buildAuditMessageForFailure(
SCMAction.LIST_CONTAINER, auditMap, ex));
SCMAction.LIST_CONTAINER_IDS, auditMap, ex));
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public enum SCMAction implements AuditAction {
QUERY_NODE,
GET_PIPELINE,
RECONCILE_CONTAINER,
GET_DELETED_BLOCK_SUMMARY;
GET_DELETED_BLOCK_SUMMARY,
LIST_CONTAINER_IDS;

@Override
public String getAction() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,37 @@ private void verifyContainerState(ContainerID containerId,
assertEquals(expectedState, containerManager.getContainer(containerId).getState());
}

@Test
public void testGetContainerIDs() throws IOException {
ContainerInfo openContainerInfo = new ContainerInfo.Builder()
.setContainerID(1)
.setState(HddsProtos.LifeCycleState.OPEN)
.setSequenceId(100L)
.setOwner("scm")
.setPipelineID(PipelineID.randomId())
.setReplicationConfig(
RatisReplicationConfig
.getInstance(ReplicationFactor.THREE))
.build();

ContainerInfo closedContainerInfo = new ContainerInfo.Builder()
.setContainerID(2)
.setState(HddsProtos.LifeCycleState.CLOSED)
.setSequenceId(200L)
.setOwner("scm")
.setPipelineID(PipelineID.randomId())
.setReplicationConfig(
RatisReplicationConfig
.getInstance(ReplicationFactor.THREE))
.build();

containerStateManager.addContainer(openContainerInfo.getProtobuf());
containerStateManager.addContainer(closedContainerInfo.getProtobuf());

assertEquals(1, containerStateManager.getContainerIDs(
HddsProtos.LifeCycleState.CLOSED, ContainerID.MIN, 10).size());
}

@Test
public void testSequenceIdOnStateUpdate() throws Exception {
ContainerID containerID = ContainerID.valueOf(3L);
Expand Down
Loading