diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index b37c7ed3e16b..d7db018e0f50 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -54,6 +54,7 @@ import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock; import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks; import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveTableKeysRequest; @@ -76,7 +77,6 @@ public class SnapshotDeletingService extends AbstractKeyDeletingService { // from the same table and can send deletion requests for same snapshot // multiple times. private static final int SNAPSHOT_DELETING_CORE_POOL_SIZE = 1; - private static final int MIN_ERR_LIMIT_PER_TASK = 1000; private final ClientId clientId = ClientId.randomId(); private final OzoneManager ozoneManager; @@ -119,7 +119,8 @@ public SnapshotDeletingService(long interval, long serviceTimeout, this.lockIds = new ArrayList<>(2); } - private class SnapshotDeletingTask implements BackgroundTask { + @VisibleForTesting + final class SnapshotDeletingTask implements BackgroundTask { @SuppressWarnings("checkstyle:MethodLength") @Override @@ -210,8 +211,8 @@ public BackgroundTaskResult call() throws InterruptedException { renameKeys.add(HddsProtos.KeyValue.newBuilder().setKey(renameEntry.getKey()) .setValue(renameEntry.getValue()).build()); } - submitSnapshotMoveDeletedKeys(snapInfo, deletedKeys, renameKeys, deletedDirs); - remaining -= moveCount; + int submitted = submitSnapshotMoveDeletedKeysWithBatching(snapInfo, deletedKeys, renameKeys, deletedDirs); + remaining -= submitted; } else { snapshotsToBePurged.add(snapInfo.getTableKey()); } @@ -247,39 +248,180 @@ private void submitSnapshotPurgeRequest(List purgeSnapshotKeys) { } } - private void submitSnapshotMoveDeletedKeys(SnapshotInfo snapInfo, - List deletedKeys, - List renamedList, - List dirsToMove) { - - SnapshotMoveTableKeysRequest.Builder moveDeletedKeysBuilder = SnapshotMoveTableKeysRequest.newBuilder() + /** + * Submits a single batch of snapshot move requests. + * + * @param snapInfo The snapshot being processed + * @param deletedKeys List of deleted keys to move + * @param renamedList List of renamed keys + * @param dirsToMove List of deleted directories to move + * @return true if submission was successful, false otherwise + */ + private boolean submitSingleSnapshotMoveBatch(SnapshotInfo snapInfo, + List deletedKeys, + List renamedList, + List dirsToMove) { + SnapshotMoveTableKeysRequest.Builder moveDeletedKeys = SnapshotMoveTableKeysRequest.newBuilder() .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId())); - SnapshotMoveTableKeysRequest moveDeletedKeys = moveDeletedKeysBuilder - .addAllDeletedKeys(deletedKeys) - .addAllRenamedKeys(renamedList) - .addAllDeletedDirs(dirsToMove) - .build(); - if (isBufferLimitCrossed(ratisByteLimit, 0, moveDeletedKeys.getSerializedSize())) { - int remaining = MIN_ERR_LIMIT_PER_TASK; - deletedKeys = deletedKeys.subList(0, Math.min(remaining, deletedKeys.size())); - remaining -= deletedKeys.size(); - renamedList = renamedList.subList(0, Math.min(remaining, renamedList.size())); - remaining -= renamedList.size(); - dirsToMove = dirsToMove.subList(0, Math.min(remaining, dirsToMove.size())); - moveDeletedKeys = moveDeletedKeysBuilder - .addAllDeletedKeys(deletedKeys) - .addAllRenamedKeys(renamedList) - .addAllDeletedDirs(dirsToMove) - .build(); + if (!deletedKeys.isEmpty()) { + moveDeletedKeys.addAllDeletedKeys(deletedKeys); + } + + if (!renamedList.isEmpty()) { + moveDeletedKeys.addAllRenamedKeys(renamedList); + } + + if (!dirsToMove.isEmpty()) { + moveDeletedKeys.addAllDeletedDirs(dirsToMove); } OMRequest omRequest = OMRequest.newBuilder() .setCmdType(Type.SnapshotMoveTableKeys) - .setSnapshotMoveTableKeysRequest(moveDeletedKeys) + .setSnapshotMoveTableKeysRequest(moveDeletedKeys.build()) .setClientId(clientId.toString()) .build(); - submitOMRequest(omRequest); + + try { + OzoneManagerProtocolProtos.OMResponse response = submitRequest(omRequest); + if (response == null || !response.getSuccess()) { + LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run."); + return false; + } + return true; + } catch (ServiceException e) { + LOG.error("SnapshotMoveTableKeys request failed. Will retry in the next run", e); + return false; + } + } + + /** + * Submits snapshot move requests with batching to respect the Ratis buffer limit. + * This method progressively builds batches while checking size limits before adding entries. + * + * @param snapInfo The snapshot being processed + * @param deletedKeys List of deleted keys to move + * @param renamedList List of renamed keys + * @param dirsToMove List of deleted directories to move + * @return The number of entries successfully submitted + */ + @VisibleForTesting + public int submitSnapshotMoveDeletedKeysWithBatching(SnapshotInfo snapInfo, + List deletedKeys, + List renamedList, + List dirsToMove) { + List currentDeletedKeys = new ArrayList<>(); + List currentRenamedKeys = new ArrayList<>(); + List currentDeletedDirs = new ArrayList<>(); + int totalSubmitted = 0; + int batchCount = 0; + + SnapshotMoveTableKeysRequest emptyRequest = SnapshotMoveTableKeysRequest.newBuilder() + .setFromSnapshotID(toProtobuf(snapInfo.getSnapshotId())) + .build(); + OMRequest baseRequest = OMRequest.newBuilder() + .setCmdType(Type.SnapshotMoveTableKeys) + .setSnapshotMoveTableKeysRequest(emptyRequest) + .setClientId(clientId.toString()) + .build(); + int baseOverhead = baseRequest.getSerializedSize(); + long batchBytes = baseOverhead; + + for (SnapshotMoveKeyInfos key : deletedKeys) { + int keySize = key.getSerializedSize(); + + // If adding this key would exceed the limit, flush the current batch first + if (batchBytes + keySize > ratisByteLimit && !currentDeletedKeys.isEmpty()) { + batchCount++; + LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size(); + currentDeletedKeys.clear(); + currentRenamedKeys.clear(); + currentDeletedDirs.clear(); + batchBytes = baseOverhead; + } + + currentDeletedKeys.add(key); + batchBytes += keySize; + } + + for (HddsProtos.KeyValue renameKey : renamedList) { + int keySize = renameKey.getSerializedSize(); + + // If adding this key would exceed the limit, flush the current batch first + if (batchBytes + keySize > ratisByteLimit && + (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty())) { + batchCount++; + LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size(); + currentDeletedKeys.clear(); + currentRenamedKeys.clear(); + currentDeletedDirs.clear(); + batchBytes = baseOverhead; + } + + currentRenamedKeys.add(renameKey); + batchBytes += keySize; + } + + for (SnapshotMoveKeyInfos dir : dirsToMove) { + int dirSize = dir.getSerializedSize(); + + // If adding this dir would exceed the limit, flush the current batch first + if (batchBytes + dirSize > ratisByteLimit && + (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty())) { + batchCount++; + LOG.debug("Submitting batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size(); + currentDeletedKeys.clear(); + currentRenamedKeys.clear(); + currentDeletedDirs.clear(); + batchBytes = baseOverhead; + } + + currentDeletedDirs.add(dir); + batchBytes += dirSize; + } + + // Submit the final batch if any + if (!currentDeletedKeys.isEmpty() || !currentRenamedKeys.isEmpty() || !currentDeletedDirs.isEmpty()) { + batchCount++; + LOG.debug("Submitting final batch {} for snapshot {} with {} deletedKeys, {} renamedKeys, {} deletedDirs, " + + "size: {} bytes", batchCount, snapInfo.getTableKey(), currentDeletedKeys.size(), + currentRenamedKeys.size(), currentDeletedDirs.size(), batchBytes); + + if (!submitSingleSnapshotMoveBatch(snapInfo, currentDeletedKeys, currentRenamedKeys, currentDeletedDirs)) { + return totalSubmitted; + } + + totalSubmitted += currentDeletedKeys.size() + currentRenamedKeys.size() + currentDeletedDirs.size(); + } + + LOG.debug("Successfully submitted {} total entries in {} batches for snapshot {}", totalSubmitted, batchCount, + snapInfo.getTableKey()); + + return totalSubmitted; } private void submitOMRequest(OMRequest omRequest) { @@ -317,9 +459,4 @@ public DeletingServiceTaskQueue getTasks() { public long getSuccessfulRunCount() { return successRunCount.get(); } - - @VisibleForTesting - public void setSuccessRunCount(long num) { - successRunCount.getAndSet(num); - } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java index df7dfb7ce48b..c14596f891c8 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestSnapshotDeletingService.java @@ -18,19 +18,34 @@ package org.apache.hadoop.ozone.om.service; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.utils.TransactionInfo; import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.ozone.ClientVersion; import org.apache.hadoop.ozone.om.KeyManagerImpl; +import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmMetadataManagerImpl; import org.apache.hadoop.ozone.om.OmSnapshotManager; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SnapshotChainManager; +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotMoveKeyInfos; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -98,4 +113,184 @@ public void testProcessSnapshotLogicInSDS(SnapshotInfo snapshotInfo, snapshotInfo.setSnapshotStatus(status); assertEquals(expectedOutcome, snapshotDeletingService.shouldIgnoreSnapshot(snapshotInfo)); } + + /** + * Test that verifies the Ratis buffer limit is respected during batching and all entries are processed. + * This test creates entries that would exceed the buffer limit and verifies: + * 1. Multiple batches are created when needed + * 2. No single batch exceeds the buffer limit + * 3. All entries (deletedKeys, renamedKeys, deletedDirs) are eventually processed without orphans + */ + @Test + public void testSnapshotMoveKeysRequestBatching() throws Exception { + final int ratisBufferLimit = 50 * 1024; // 50 KB + OzoneConfiguration testConf = new OzoneConfiguration(); + testConf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + ratisBufferLimit, StorageUnit.BYTES); + + Mockito.when(omMetadataManager.getSnapshotChainManager()).thenReturn(chainManager); + Mockito.when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager); + Mockito.when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager); + Mockito.when(ozoneManager.getConfiguration()).thenReturn(testConf); + + SnapshotDeletingService service = new SnapshotDeletingService(sdsRunInterval, sdsServiceTimeout, ozoneManager); + SnapshotDeletingService spyService = Mockito.spy(service); + + UUID snapshotId = UUID.randomUUID(); + SnapshotInfo snapInfo = SnapshotInfo.newBuilder() + .setSnapshotId(snapshotId) + .setVolumeName("vol1") + .setBucketName("bucket1") + .setName("snap1") + .setSstFiltered(true) + .setLastTransactionInfo(TransactionInfo.valueOf(1, 1).toByteString()) + .build(); + + int numDeletedKeys = 15; + int numRenamedKeys = 10; + int numDeletedDirs = 10; + + List deletedKeys = createLargeDeletedKeys(numDeletedKeys); + List renamedKeys = createLargeRenamedKeys(numRenamedKeys); + List deletedDirs = createLargeDeletedDirs(numDeletedDirs); + + List capturedRequests = new ArrayList<>(); + + Mockito.doAnswer(invocation -> { + OMRequest request = invocation.getArgument(0); + capturedRequests.add(request); + return OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.SnapshotMoveTableKeys) + .setStatus(OzoneManagerProtocolProtos.Status.OK) + .setSuccess(true) + .build(); + }).when(spyService).submitRequest(Mockito.any(OMRequest.class)); + + // Create task instance directly from the service + SnapshotDeletingService.SnapshotDeletingTask task = spyService.new SnapshotDeletingTask(); + + // Invoke the batching method directly (now public) + int submitted = task.submitSnapshotMoveDeletedKeysWithBatching( + snapInfo, deletedKeys, renamedKeys, deletedDirs); + + // Verify results + int totalExpected = numDeletedKeys + numRenamedKeys + numDeletedDirs; + assertEquals(totalExpected, submitted, + "All entries should be submitted"); + + // Verify multiple batches were created (since data should exceed buffer) + assertTrue(capturedRequests.size() > 1); + + for (OMRequest omRequest : capturedRequests) { + assertEquals(OzoneManagerProtocolProtos.Type.SnapshotMoveTableKeys, omRequest.getCmdType()); + + int requestSize = omRequest.getSerializedSize(); + assertTrue(requestSize <= ratisBufferLimit); + } + + int totalDeletedKeysProcessed = capturedRequests.stream() + .mapToInt(req -> req.getSnapshotMoveTableKeysRequest().getDeletedKeysCount()) + .sum(); + int totalRenamedKeysProcessed = capturedRequests.stream() + .mapToInt(req -> req.getSnapshotMoveTableKeysRequest().getRenamedKeysCount()) + .sum(); + int totalDeletedDirsProcessed = capturedRequests.stream() + .mapToInt(req -> req.getSnapshotMoveTableKeysRequest().getDeletedDirsCount()) + .sum(); + + assertEquals(numDeletedKeys, totalDeletedKeysProcessed); + assertEquals(numRenamedKeys, totalRenamedKeysProcessed); + assertEquals(numDeletedDirs, totalDeletedDirsProcessed); + + // Verify no orphan entries (all items accounted for) + assertEquals(totalExpected, totalDeletedKeysProcessed + totalRenamedKeysProcessed + totalDeletedDirsProcessed); + } + + /** + * Helper method to create large deleted keys that will contribute to buffer size. + */ + private List createLargeDeletedKeys(int count) { + List deletedKeys = new ArrayList<>(); + for (int i = 0; i < count; i++) { + // Create keys with large names and multiple key infos to increase size + String largeKeyName = "/vol1/bucket1/" + generateLargeString(500) + "_key_" + i; + List keyInfos = new ArrayList<>(); + for (int j = 0; j < 3; j++) { + OmKeyInfo keyInfo = createOmKeyInfo(largeKeyName + "_part_" + j); + keyInfos.add(keyInfo); + } + + SnapshotMoveKeyInfos moveKeyInfo = SnapshotMoveKeyInfos.newBuilder() + .setKey(largeKeyName) + .addAllKeyInfos(keyInfos.stream() + .map(k -> k.getProtobuf(ClientVersion.CURRENT_VERSION)) + .collect(Collectors.toList())) + .build(); + deletedKeys.add(moveKeyInfo); + } + return deletedKeys; + } + + /** + * Helper method to create large renamed keys. + */ + private List createLargeRenamedKeys(int count) { + List renamedKeys = new ArrayList<>(); + for (int i = 0; i < count; i++) { + String largeOldName = "/vol1/bucket1/" + generateLargeString(500) + "_old_" + i; + String largeNewName = "/vol1/bucket1/" + generateLargeString(500) + "_new_" + i; + renamedKeys.add(HddsProtos.KeyValue.newBuilder() + .setKey(largeOldName) + .setValue(largeNewName) + .build()); + } + return renamedKeys; + } + + /** + * Helper method to create large deleted directories. + */ + private List createLargeDeletedDirs(int count) { + List deletedDirs = new ArrayList<>(); + for (int i = 0; i < count; i++) { + String largeDirName = "/vol1/bucket1/" + generateLargeString(500) + "_dir_" + i; + OmKeyInfo dirInfo = createOmKeyInfo(largeDirName); + + SnapshotMoveKeyInfos moveDirInfo = SnapshotMoveKeyInfos.newBuilder() + .setKey(largeDirName) + .addKeyInfos(dirInfo.getProtobuf(ClientVersion.CURRENT_VERSION)) + .build(); + deletedDirs.add(moveDirInfo); + } + return deletedDirs; + } + + /** + * Helper method to create an OmKeyInfo object for testing. + */ + private OmKeyInfo createOmKeyInfo(String keyName) { + return new OmKeyInfo.Builder() + .setVolumeName("vol1") + .setBucketName("bucket1") + .setKeyName(keyName) + .setReplicationConfig(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE)) + .setObjectID(System.currentTimeMillis()) + .setUpdateID(System.currentTimeMillis()) + .setDataSize(1024 * 1024) // 1 MB + .setCreationTime(System.currentTimeMillis()) + .setModificationTime(System.currentTimeMillis()) + .setOmKeyLocationInfos(new ArrayList<>()) + .build(); + } + + /** + * Helper method to generate a large string of specified length. + */ + private String generateLargeString(int length) { + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append((char) ('a' + (i % 26))); + } + return sb.toString(); + } }