From 14d25f1ac6cc0c3ce2f8b32398cd41ec9c5cacc1 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Fri, 28 Nov 2025 01:11:15 -0500 Subject: [PATCH 1/5] HDDS-14020. Use forkjoin pool instead of thread pool on background service Change-Id: I4c1a051e8574d32375cbebeb10546563f4a4f817 --- .../hadoop/hdds/utils/BackgroundService.java | 246 +++++++++++------- .../hadoop/hdds/utils/BackgroundTask.java | 46 +++- .../hdds/utils/BackgroundTaskResult.java | 1 - .../hadoop/util/TestBackgroundService.java | 4 +- .../background/BlockDeletingTask.java | 16 +- ...leRecoveringContainerScrubbingService.java | 6 +- .../common/BlockDeletingServiceTestImpl.java | 2 +- .../scm/block/SCMBlockDeletingService.java | 2 +- .../hadoop/ozone/om/SstFilteringService.java | 7 +- .../service/AbstractKeyDeletingService.java | 39 ++- .../ozone/om/service/CompactionService.java | 2 +- .../om/service/DirectoryDeletingService.java | 180 +++++++------ .../ozone/om/service/KeyDeletingService.java | 2 +- .../MultipartUploadCleanupService.java | 2 +- .../om/service/OMRangerBGSyncService.java | 2 +- .../om/service/OpenKeyCleanupService.java | 2 +- .../om/service/SnapshotDeletingService.java | 2 +- .../service/SnapshotDiffCleanupService.java | 2 +- .../defrag/SnapshotDefragService.java | 2 +- .../service/TestDirectoryDeletingService.java | 5 +- 20 files changed, 355 insertions(+), 215 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 144d1725fdb5..cb0cacdf6888 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -18,13 +18,14 @@ package org.apache.hadoop.hdds.utils; import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinWorkerThread; +import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,20 +38,18 @@ */ public abstract class BackgroundService { - protected static final Logger LOG = - LoggerFactory.getLogger(BackgroundService.class); + protected static final Logger LOG = LoggerFactory.getLogger(BackgroundService.class); // Executor to launch child tasks - private ScheduledThreadPoolExecutor exec; + private volatile ForkJoinPool exec; private ThreadGroup threadGroup; private final String serviceName; - private long interval; + private volatile long intervalInMillis; private volatile long serviceTimeoutInNanos; - private TimeUnit unit; - private final int threadPoolSize; + private volatile int threadPoolSize; private final String threadNamePrefix; - private final PeriodicalTask service; - private CompletableFuture future; + private volatile CompletableFuture future; + private volatile AtomicBoolean isShutdown; public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout) { @@ -60,15 +59,13 @@ public BackgroundService(String serviceName, long interval, public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout, String threadNamePrefix) { - this.interval = interval; - this.unit = unit; + setInterval(interval, unit); this.serviceName = serviceName; this.serviceTimeoutInNanos = TimeDuration.valueOf(serviceTimeout, unit) .toLong(TimeUnit.NANOSECONDS); this.threadPoolSize = threadPoolSize; this.threadNamePrefix = threadNamePrefix; initExecutorAndThreadGroup(); - service = new PeriodicalTask(); this.future = CompletableFuture.completedFuture(null); } @@ -77,22 +74,23 @@ protected CompletableFuture getFuture() { } @VisibleForTesting - public synchronized ExecutorService getExecutorService() { + public synchronized ForkJoinPool getExecutorService() { return this.exec; } - public synchronized void setPoolSize(int size) { + /** + * Set the pool size for background service. This would require a shutdown and restart of the service for the + * change to take effect. + * @param size + */ + public void setPoolSize(int size) { if (size <= 0) { throw new IllegalArgumentException("Pool size must be positive."); } - - // In ScheduledThreadPoolExecutor, maximumPoolSize is Integer.MAX_VALUE - // the corePoolSize will always less maximumPoolSize. - // So we can directly set the corePoolSize - exec.setCorePoolSize(size); + this.threadPoolSize = size; } - public synchronized void setServiceTimeoutInNanos(long newTimeout) { + public void setServiceTimeoutInNanos(long newTimeout) { LOG.info("{} timeout is set to {} {}", serviceName, newTimeout, TimeUnit.NANOSECONDS.name().toLowerCase()); this.serviceTimeoutInNanos = newTimeout; } @@ -104,7 +102,7 @@ public int getThreadCount() { @VisibleForTesting public void runPeriodicalTaskNow() throws Exception { - BackgroundTaskQueue tasks = getTasks(); + BackgroundTaskQueue tasks = getTasks(false); while (!tasks.isEmpty()) { tasks.poll().call(); } @@ -116,18 +114,20 @@ public synchronized void start() { if (exec == null || exec.isShutdown() || exec.isTerminated()) { initExecutorAndThreadGroup(); } - LOG.info("Starting service {} with interval {} {}", serviceName, - interval, unit.name().toLowerCase()); - exec.scheduleWithFixedDelay(service, 0, interval, unit); + LOG.info("Starting service {} with interval {} ms", serviceName, intervalInMillis); + exec.execute(new PeriodicalTask(-1)); + } + + protected void setInterval(long newInterval, TimeUnit newUnit) { + this.intervalInMillis = TimeDuration.valueOf(newInterval, newUnit).toLong(TimeUnit.MILLISECONDS); } - protected synchronized void setInterval(long newInterval, TimeUnit newUnit) { - this.interval = newInterval; - this.unit = newUnit; + protected long getIntervalMillis() { + return intervalInMillis; } - protected synchronized long getIntervalMillis() { - return this.unit.toMillis(interval); + public BackgroundTaskQueue getTasks(boolean allowTasksToFork) { + return getTasks(); } public abstract BackgroundTaskQueue getTasks(); @@ -138,84 +138,148 @@ protected void execTaskCompletion() { } * Run one or more background tasks concurrently. * Wait until all tasks to return the result. */ - public class PeriodicalTask implements Runnable { - @Override - public void run() { - // wait for previous set of tasks to complete - try { - future.join(); - } catch (RuntimeException e) { - LOG.error("Background service execution failed.", e); - } finally { - execTaskCompletion(); - } + public class PeriodicalTask extends RecursiveAction { + private int numberOfLoops; + private final Queue tasksInFlight; + private final AtomicBoolean isShutdown; - if (LOG.isDebugEnabled()) { - LOG.debug("Running background service : {}", serviceName); + public PeriodicalTask(int numberOfLoops) { + this.numberOfLoops = numberOfLoops; + this.tasksInFlight = new LinkedList<>(); + this.isShutdown = BackgroundService.this.isShutdown; + } + + private boolean waitForNextInterval() { + + if (numberOfLoops > 0) { + numberOfLoops--; + if (numberOfLoops == 0) { + return false; + } } - BackgroundTaskQueue tasks = getTasks(); - if (tasks.isEmpty()) { - // No task found, or some problems to init tasks - // return and retry in next interval. - return; + // Check if the executor has been shutdown during task execution. + if (!isShutdown.get()) { + synchronized (BackgroundService.this) { + // Get the shutdown flag again after acquiring the lock. + if (isShutdown.get()) { + return false; + } + try { + BackgroundService.this.wait(intervalInMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for next interval.", e); + return false; + } + } } + return !isShutdown.get(); + } + + @Override + public void compute() { if (LOG.isDebugEnabled()) { - LOG.debug("Number of background tasks to execute : {}", tasks.size()); + LOG.debug("Running background service : {}", serviceName); } - synchronized (BackgroundService.this) { + boolean runAgain = true; + do { + future = new CompletableFuture<>(); + BackgroundTaskQueue tasks = getTasks(true); + if (tasks.isEmpty()) { + // No task found, or some problems to init tasks + // return and retry in next interval. + continue; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Number of background tasks to execute : {}", tasks.size()); + } + while (!tasks.isEmpty()) { BackgroundTask task = tasks.poll(); - future = future.thenCombine(CompletableFuture.runAsync(() -> { - long startTime = System.nanoTime(); - try { - BackgroundTaskResult result = task.call(); - if (LOG.isDebugEnabled()) { - LOG.debug("task execution result size {}", result.getSize()); - } - } catch (Throwable e) { - LOG.error("Background task execution failed", e); - if (e instanceof Error) { - throw (Error) e; - } - } finally { - long endTime = System.nanoTime(); - if (endTime - startTime > serviceTimeoutInNanos) { - LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", - serviceName, endTime - startTime, serviceTimeoutInNanos); - } + // Fork and submit the task back to executor. + task.fork(); + tasksInFlight.offer(task); + } + + while (!tasksInFlight.isEmpty()) { + BackgroundTask taskInFlight = tasksInFlight.poll(); + // Join the tasks forked before and wait for the result one by one. + BackgroundTask.BackgroundTaskForkResult result = taskInFlight.join(); + // Check for exception first in the task execution. + if (result.getThrowable() != null) { + LOG.error("Background task execution failed", result.getThrowable()); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("task execution result size {}", result.getResult().getSize()); } - }, exec).exceptionally(e -> null), (Void1, Void) -> null); + } + if (result.getTotalExecutionTime() > serviceTimeoutInNanos) { + LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", + serviceName, result.getTotalExecutionTime(), serviceTimeoutInNanos); + } } - } + future.complete(null); + runAgain = waitForNextInterval(); + } while (runAgain); } } // shutdown and make sure all threads are properly released. - public synchronized void shutdown() { + public void shutdown() { LOG.info("Shutting down service {}", this.serviceName); - exec.shutdown(); - try { - if (!exec.awaitTermination(60, TimeUnit.SECONDS)) { - exec.shutdownNow(); + final ThreadGroup threadGroupToBeClosed; + final ForkJoinPool execToShutdown; + // Set the shutdown flag to true to prevent new tasks from being submitted. + synchronized (this) { + threadGroupToBeClosed = threadGroup; + execToShutdown = exec; + exec = null; + threadGroup = null; + if (isShutdown != null) { + this.isShutdown.set(true); } - } catch (InterruptedException e) { - // Re-interrupt the thread while catching InterruptedException - Thread.currentThread().interrupt(); - exec.shutdownNow(); + isShutdown = null; + this.notify(); } - if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) { - threadGroup.destroy(); + if (execToShutdown != null) { + execToShutdown.shutdown(); + try { + if (!execToShutdown.awaitTermination(60, TimeUnit.SECONDS)) { + execToShutdown.shutdownNow(); + } + } catch (InterruptedException e) { + // Re-interrupt the thread while catching InterruptedException + Thread.currentThread().interrupt(); + execToShutdown.shutdownNow(); + } + if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) { + threadGroupToBeClosed.destroy(); + } } } private void initExecutorAndThreadGroup() { - threadGroup = new ThreadGroup(serviceName); - ThreadFactory threadFactory = new ThreadFactoryBuilder() - .setThreadFactory(r -> new Thread(threadGroup, r)) - .setDaemon(true) - .setNameFormat(threadNamePrefix + serviceName + "#%d") - .build(); - exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(threadPoolSize, threadFactory); + try { + threadGroup = new ThreadGroup(serviceName); + Thread initThread = new Thread(threadGroup, () -> { + ForkJoinPool.ForkJoinWorkerThreadFactory factory = + pool -> { + ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) { + }; + thread.setDaemon(true); + thread.setName(threadNamePrefix + serviceName + thread.getPoolIndex()); + return thread; + }; + exec = new ForkJoinPool(threadPoolSize, factory, null, false); + isShutdown = new AtomicBoolean(false); + }); + initThread.start(); + initThread.join(); + } catch (InterruptedException e) { + shutdown(); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } protected String getServiceName() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java index 0717087a8655..8ef18c94cc3b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java @@ -18,16 +18,56 @@ package org.apache.hadoop.hdds.utils; import java.util.concurrent.Callable; +import java.util.concurrent.RecursiveTask; /** * A task thread to run by {@link BackgroundService}. */ -public interface BackgroundTask extends Callable { +public abstract class BackgroundTask extends RecursiveTask + implements Callable { + private static final long serialVersionUID = 1L; + + static final class BackgroundTaskForkResult { + private final BackgroundTaskResult result; + private final Throwable throwable; + private final long startTime; + private final long endTime; + + private BackgroundTaskForkResult(BackgroundTaskResult result, long startTime, long endTime, Throwable throwable) { + this.endTime = endTime; + this.result = result; + this.startTime = startTime; + this.throwable = throwable; + } + + public long getTotalExecutionTime() { + return endTime - startTime; + } + + public BackgroundTaskResult getResult() { + return result; + } + + public Throwable getThrowable() { + return throwable; + } + } @Override - BackgroundTaskResult call() throws Exception; + protected final BackgroundTaskForkResult compute() { + long startTime = System.nanoTime(); + BackgroundTaskResult result = null; + Throwable throwable = null; + try { + result = this.call(); + } catch (Throwable e) { + throwable = e; + } + long endTime = System.nanoTime(); + return new BackgroundTaskForkResult(result, startTime, endTime, throwable); + } - default int getPriority() { + public int getPriority() { return 0; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java index e7083a45c061..047f3b16fe72 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java @@ -21,7 +21,6 @@ * Result of a {@link BackgroundTask}. */ public interface BackgroundTaskResult { - /** * Returns the size of entries included in this result. */ diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java index 2c5ed1bfea5a..3bdc2adbf020 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java @@ -52,7 +52,7 @@ public class TestBackgroundService { private BackgroundService backgroundService; private AtomicInteger runCount; - private static class TestTask implements BackgroundTask { + private static class TestTask extends BackgroundTask { private Map map; private Map locks; private int index; @@ -91,7 +91,7 @@ public void shutDown() { } private BackgroundService createBackgroundService(String name, int interval, - TimeUnit unit, int workerThreads, int serviceTimeout) { + TimeUnit unit, int workerThreads, int serviceTimeout) throws InterruptedException { return new BackgroundService(name, interval, unit, workerThreads, serviceTimeout) { @Override public BackgroundTaskQueue getTasks() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java index 1f66cad476f7..e382f0ad844d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java @@ -60,19 +60,19 @@ /** * BlockDeletingTask for KeyValueContainer. */ -public class BlockDeletingTask implements BackgroundTask { +public class BlockDeletingTask extends BackgroundTask { - private static final Logger LOG = - LoggerFactory.getLogger(BlockDeletingTask.class); + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(BlockDeletingTask.class); - private final BlockDeletingServiceMetrics metrics; + private final transient BlockDeletingServiceMetrics metrics; private final int priority; - private final KeyValueContainerData containerData; + private final transient KeyValueContainerData containerData; private long blocksToDelete; - private final OzoneContainer ozoneContainer; - private final ConfigurationSource conf; + private final transient OzoneContainer ozoneContainer; + private final transient ConfigurationSource conf; private Duration blockDeletingMaxLockHoldingTime; - private final ContainerChecksumTreeManager checksumTreeManager; + private final transient ContainerChecksumTreeManager checksumTreeManager; public BlockDeletingTask( BlockDeletingService blockDeletingService, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java index 5535c5128ccc..51049ee22bbd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java @@ -70,8 +70,10 @@ public BackgroundTaskQueue getTasks() { return backgroundTaskQueue; } - static class RecoveringContainerScrubbingTask implements BackgroundTask { - private final ContainerSet containerSet; + static class RecoveringContainerScrubbingTask extends BackgroundTask { + private static final long serialVersionUID = 1L; + + private final transient ContainerSet containerSet; private final long containerID; RecoveringContainerScrubbingTask( diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java index 8e41ab686a2e..bf8e8d3b627f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java @@ -68,7 +68,7 @@ int getTimesOfProcessed() { // Override the implementation to start a single on-call control thread. @Override public void start() { - PeriodicalTask svc = new PeriodicalTask(); + PeriodicalTask svc = new PeriodicalTask(1); // In test mode, relies on a latch countdown to runDeletingTasks tasks. Runnable r = () -> { while (true) { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 348e2e9d5dcb..473cd3257b62 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -139,7 +139,7 @@ public BackgroundTaskQueue getTasks() { return queue; } - private class DeletedBlockTransactionScanner implements BackgroundTask { + private class DeletedBlockTransactionScanner extends BackgroundTask { @Override public int getPriority() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java index 0e09bbcbe4a2..389d459be4b7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java @@ -126,7 +126,7 @@ public void resume() { running.set(true); } - private class SstFilteringTask implements BackgroundTask { + private class SstFilteringTask extends BackgroundTask { private boolean isSnapshotDeleted(SnapshotInfo snapshotInfo) { return snapshotInfo == null || snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED; @@ -160,7 +160,7 @@ private void markSSTFilteredFlagForSnapshot(SnapshotInfo snapshotInfo) throws IO } @Override - public BackgroundTaskResult call() throws Exception { + public BackgroundTaskResult call() { Optional snapshotManager = Optional.ofNullable(ozoneManager) .map(OzoneManager::getOmSnapshotManager); @@ -222,6 +222,9 @@ public BackgroundTaskResult call() throws Exception { .getSnapshotId()); } } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("SST filtering task interrupted for snapshot: {}", snapShotTableKey, e); } } catch (IOException e) { if (isSnapshotDeleted(snapshotInfoTable.get(snapShotTableKey))) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 8e43a9fc8966..887175077033 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -106,6 +106,30 @@ boolean isPreviousPurgeTransactionFlushed() throws IOException { return true; } + private static final class BackgroundDeleteTask extends BackgroundTask { + private static final long serialVersionUID = 1L; + + private final transient BootstrapStateHandler.Lock bootstrapLock; + private final BackgroundTask task; + + private BackgroundDeleteTask(BootstrapStateHandler.Lock bootstrapLock, BackgroundTask task) { + this.bootstrapLock = bootstrapLock; + this.task = task; + } + + @Override + public BackgroundTaskResult call() throws Exception { + try (UncheckedAutoCloseable readLock = bootstrapLock.acquireReadLock()) { + return task.call(); + } + } + + @Override + public int getPriority() { + return task.getPriority(); + } + } + /** * A specialized implementation of {@link BackgroundTaskQueue} that modifies * the behavior of added tasks to utilize a read lock during execution. @@ -119,20 +143,7 @@ boolean isPreviousPurgeTransactionFlushed() throws IOException { public class DeletingServiceTaskQueue extends BackgroundTaskQueue { @Override public synchronized void add(BackgroundTask task) { - super.add(new BackgroundTask() { - - @Override - public BackgroundTaskResult call() throws Exception { - try (UncheckedAutoCloseable readLock = getBootstrapStateLock().acquireReadLock()) { - return task.call(); - } - } - - @Override - public int getPriority() { - return task.getPriority(); - } - }); + super.add(new BackgroundDeleteTask(lock, task)); } } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java index dfcc32578adf..786f703ccdab 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java @@ -155,7 +155,7 @@ protected void compactFully(String tableName) throws IOException { } } - private class CompactTask implements BackgroundTask { + private class CompactTask extends BackgroundTask { private final String tableName; CompactTask(String tableName) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index a79eeda74f26..371b6442257d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.areSnapshotChangesFlushedToDB; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; @@ -34,17 +35,15 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Queue; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.RecursiveTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -155,11 +154,10 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { // Using multi thread for DirDeletion. Multiple threads would read // from parent directory info from deleted directory table concurrently // and send deletion requests. - private int ratisByteLimit; + private final int ratisByteLimit; private final SnapshotChainManager snapshotChainManager; private final boolean deepCleanSnapshots; - private ExecutorService deletionThreadPool; - private final int numberOfParallelThreadsPerStore; + private final int maxForksPerStore; private final AtomicLong deletedDirsCount; private final AtomicLong movedDirsCount; private final AtomicLong movedFilesCount; @@ -174,9 +172,7 @@ public DirectoryDeletingService(long interval, TimeUnit unit, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, StorageUnit.BYTES); - this.numberOfParallelThreadsPerStore = dirDeletingServiceCorePoolSize; - this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, - interval, unit, new LinkedBlockingDeque<>(Integer.MAX_VALUE)); + this.maxForksPerStore = dirDeletingServiceCorePoolSize; // always go to 90% of max limit for request as other header will be added this.ratisByteLimit = (int) (limit * 0.9); registerReconfigCallbacks(ozoneManager.getReconfigurationHandler()); @@ -214,8 +210,13 @@ private synchronized void updateAndRestart(OzoneConfiguration conf) { @Override public DeletingServiceTaskQueue getTasks() { + return getTasks(false); + } + + @Override + public DeletingServiceTaskQueue getTasks(boolean allowTasksToFork) { DeletingServiceTaskQueue queue = new DeletingServiceTaskQueue(); - queue.add(new DirDeletingTask(null)); + queue.add(new DirDeletingTask(null, allowTasksToFork, this)); if (deepCleanSnapshots) { Iterator iterator = null; try { @@ -226,41 +227,12 @@ public DeletingServiceTaskQueue getTasks() { } while (iterator.hasNext()) { UUID snapshotId = iterator.next(); - queue.add(new DirDeletingTask(snapshotId)); + queue.add(new DirDeletingTask(snapshotId, allowTasksToFork, this)); } } return queue; } - @Override - public void shutdown() { - if (deletionThreadPool != null) { - deletionThreadPool.shutdown(); - try { - if (!deletionThreadPool.awaitTermination(60, TimeUnit.SECONDS)) { - deletionThreadPool.shutdownNow(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - deletionThreadPool.shutdownNow(); - } - } - super.shutdown(); - } - - @Override - public synchronized void start() { - if (deletionThreadPool == null || deletionThreadPool.isShutdown() || deletionThreadPool.isTerminated()) { - this.deletionThreadPool = new ThreadPoolExecutor(0, numberOfParallelThreadsPerStore, - super.getIntervalMillis(), TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(Integer.MAX_VALUE)); - } - super.start(); - } - - private boolean isThreadPoolActive(ExecutorService threadPoolExecutor) { - return threadPoolExecutor != null && !threadPoolExecutor.isShutdown() && !threadPoolExecutor.isTerminated(); - } - @SuppressWarnings("checkstyle:ParameterNumber") void optimizeDirDeletesAndSubmitRequest( long dirNum, long subDirNum, long subFileNum, @@ -546,11 +518,17 @@ OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey, } @VisibleForTesting - final class DirDeletingTask implements BackgroundTask { + static final class DirDeletingTask extends BackgroundTask { + private static final long serialVersionUID = 1L; + private final UUID snapshotId; + private final boolean allowForks; + private final transient DirectoryDeletingService dds; - DirDeletingTask(UUID snapshotId) { + DirDeletingTask(UUID snapshotId, boolean allowForks, DirectoryDeletingService dds) { this.snapshotId = snapshotId; + this.allowForks = allowForks; + this.dds = dds; } @Override @@ -565,19 +543,33 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ .setExclusiveReplicatedSize(exclusiveReplicatedSize) .build(); return OzoneManagerProtocolProtos.SetSnapshotPropertyRequest.newBuilder() - .setSnapshotKey(snapshotChainManager.getTableKey(snapshotID)) + .setSnapshotKey(dds.snapshotChainManager.getTableKey(snapshotID)) .setSnapshotSizeDeltaFromDirDeepCleaning(snapshotSize) .build(); } /** + * Processes deleted directories for the given snapshot and store with optional + * parallelization depending on the configuration. Updates snapshot-related + * properties such as exclusive size mappings and deep clean flags upon successful + * processing. * - * @param currentSnapshotInfo if null, deleted directories in AOS should be processed. - * @param keyManager KeyManager of the underlying store. + * @param currentSnapshotInfo Information about the current snapshot whose deleted + * directories are being processed. Null if processing + * for the global store. + * @param keyManager Key manager responsible for handling operations related to + * deleted directories and keys. + * @param rnCnt Run count indicating the number of iterations the task has been + * executed. + * @param remainNum Remaining number of directories to process in this task. + * @throws IOException If an I/O error occurs during the processing of deleted + * directories. + * @throws ExecutionException If an error occurs while executing the task in parallel. + * @throws InterruptedException If the task execution is interrupted. */ @VisibleForTesting - void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, long rnCnt, int remainNum) - throws IOException, ExecutionException, InterruptedException { + void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, long rnCnt, + int remainNum) throws IOException, ExecutionException, InterruptedException { String volume, bucket; String snapshotTableKey; if (currentSnapshotInfo != null) { volume = currentSnapshotInfo.getVolumeName(); @@ -592,26 +584,54 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key // This is to avoid race condition b/w purge request and snapshot chain update. For AOS taking the global // snapshotId since AOS could process multiple buckets in one iteration. While using path // previous snapshotId for a snapshot since it would process only one bucket. + SnapshotChainManager snapshotChainManager = dds.snapshotChainManager; UUID expectedPreviousSnapshotId = currentSnapshotInfo == null ? snapshotChainManager.getLatestGlobalSnapshotId() : SnapshotUtils.getPreviousSnapshotId(currentSnapshotInfo, snapshotChainManager); Map> exclusiveSizeMap = Maps.newConcurrentMap(); - CompletableFuture processedAllDeletedDirs = CompletableFuture.completedFuture(true); - for (int i = 0; i < numberOfParallelThreadsPerStore; i++) { - CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, - expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, remainNum); - } catch (Throwable e) { - return false; - } - }, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool : ForkJoinPool.commonPool()); - processedAllDeletedDirs = processedAllDeletedDirs.thenCombine(future, (a, b) -> a && b); + boolean processedAllDeletedDirs; + int maxForksPerStore = dds.maxForksPerStore; + // If allowed to fork, create multiple tasks to process deleted directories tasks in parallel. + if (allowForks) { + Queue> recursiveTasks = new LinkedList<>(); + processedAllDeletedDirs = true; + for (int i = 0; i < maxForksPerStore; i++) { + RecursiveTask task = new RecursiveTask() { + private static final long serialVersionUID = 1L; + private final transient SnapshotInfo snapshotInfo = currentSnapshotInfo; + private final transient DeletedDirSupplier deletedDirSupplier = dirSupplier; + private final transient KeyManager km = keyManager; + + @Override + protected Boolean compute() { + try { + return processDeletedDirectories(snapshotInfo, km, deletedDirSupplier, + expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, remainNum); + } catch (Throwable e) { + return false; + } + } + }; + task.fork(); + recursiveTasks.offer(task); + } + while (!recursiveTasks.isEmpty()) { + processedAllDeletedDirs &= recursiveTasks.poll().join(); + } + } else { + try { + // Execute the same sequentially in the same thread. + processedAllDeletedDirs = processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, + expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, + (int) Math.min(Integer.MAX_VALUE, (long)remainNum * maxForksPerStore)); + } catch (Throwable e) { + processedAllDeletedDirs = false; + } } // If AOS or all directories have been processed for snapshot, update snapshot size delta and deep clean flag // if it is a snapshot. - if (processedAllDeletedDirs.get()) { + if (processedAllDeletedDirs) { List setSnapshotPropertyRequests = new ArrayList<>(); for (Map.Entry> entry : exclusiveSizeMap.entrySet()) { @@ -629,7 +649,7 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key .setDeepCleanedDeletedDir(true) .build()); } - submitSetSnapshotRequests(setSnapshotPropertyRequests); + dds.submitSetSnapshotRequests(setSnapshotPropertyRequests); } } } @@ -652,13 +672,13 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, DeletedDirSupplier dirSupplier, UUID expectedPreviousSnapshotId, Map> totalExclusiveSizeMap, long runCount, int remaining) { - OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); - IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock(); + OmSnapshotManager omSnapshotManager = dds.getOzoneManager().getOmSnapshotManager(); + IOzoneManagerLock lock = dds.getOzoneManager().getMetadataManager().getLock(); String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey(); - try (ReclaimableDirFilter reclaimableDirFilter = new ReclaimableDirFilter(getOzoneManager(), - omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock); - ReclaimableKeyFilter reclaimableFileFilter = new ReclaimableKeyFilter(getOzoneManager(), - omSnapshotManager, snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { + try (ReclaimableDirFilter reclaimableDirFilter = new ReclaimableDirFilter(dds.getOzoneManager(), + omSnapshotManager, dds.snapshotChainManager, currentSnapshotInfo, keyManager, lock); + ReclaimableKeyFilter reclaimableFileFilter = new ReclaimableKeyFilter(dds.getOzoneManager(), + omSnapshotManager, dds.snapshotChainManager, currentSnapshotInfo, keyManager, lock)) { long startTime = Time.monotonicNow(); long dirNum = 0L; long subDirNum = 0L; @@ -684,10 +704,10 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM .build()); boolean isDirReclaimable = reclaimableDirFilter.apply(pendingDeletedDirInfo); - Optional request = prepareDeleteDirRequest( + Optional request = dds.prepareDeleteDirRequest( pendingDeletedDirInfo.getValue(), pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList, - getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); + dds.getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); if (!request.isPresent()) { continue; } @@ -701,9 +721,9 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM subFileNum += purgePathRequest.getDeletedSubFilesCount(); } - optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, + dds.optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey, - startTime, getOzoneManager().getKeyManager(), + startTime, dds.getOzoneManager().getKeyManager(), reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos, expectedPreviousSnapshotId, runCount, remainNum); Map exclusiveReplicatedSizeMap = reclaimableFileFilter.getExclusiveReplicatedSizeMap(); @@ -734,38 +754,38 @@ startTime, getOzoneManager().getKeyManager(), public BackgroundTaskResult call() { // Check if this is the Leader OM. If not leader, no need to execute this // task. - if (shouldRun()) { - final long run = getRunCount().incrementAndGet(); + if (dds.shouldRun()) { + final long run = dds.getRunCount().incrementAndGet(); if (snapshotId == null) { LOG.debug("Running DirectoryDeletingService for active object store, {}", run); } else { LOG.debug("Running DirectoryDeletingService for snapshot : {}, {}", snapshotId, run); } - OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); + OmSnapshotManager omSnapshotManager = dds.getOzoneManager().getOmSnapshotManager(); SnapshotInfo snapInfo = null; try { snapInfo = snapshotId == null ? null : - SnapshotUtils.getSnapshotInfo(getOzoneManager(), snapshotChainManager, snapshotId); + SnapshotUtils.getSnapshotInfo(dds.getOzoneManager(), dds.snapshotChainManager, snapshotId); if (snapInfo != null) { if (snapInfo.isDeepCleanedDeletedDir()) { LOG.info("Snapshot {} has already been deep cleaned directory. Skipping the snapshot in this iteration.", snapInfo.getSnapshotId()); return BackgroundTaskResult.EmptyTaskResult.newResult(); } - if (!OmSnapshotManager.areSnapshotChangesFlushedToDB(getOzoneManager().getMetadataManager(), snapInfo)) { + if (!areSnapshotChangesFlushedToDB(dds.getOzoneManager().getMetadataManager(), snapInfo)) { LOG.info("Skipping snapshot processing since changes to snapshot {} have not been flushed to disk", snapInfo); return BackgroundTaskResult.EmptyTaskResult.newResult(); } - } else if (!isPreviousPurgeTransactionFlushed()) { + } else if (!dds.isPreviousPurgeTransactionFlushed()) { return BackgroundTaskResult.EmptyTaskResult.newResult(); } try (UncheckedAutoCloseableSupplier omSnapshot = snapInfo == null ? null : omSnapshotManager.getActiveSnapshot(snapInfo.getVolumeName(), snapInfo.getBucketName(), snapInfo.getName())) { - KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() + KeyManager keyManager = snapInfo == null ? dds.getOzoneManager().getKeyManager() : omSnapshot.get().getKeyManager(); - processDeletedDirsForStore(snapInfo, keyManager, run, pathLimitPerTask); + processDeletedDirsForStore(snapInfo, keyManager, run, dds.pathLimitPerTask); } } catch (IOException | ExecutionException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 75019adf7ec5..4821ddad79d7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -500,7 +500,7 @@ public void setKeyLimitPerTask(int keyLimitPerTask) { * DB. */ @VisibleForTesting - final class KeyDeletingTask implements BackgroundTask { + final class KeyDeletingTask extends BackgroundTask { private final UUID snapshotId; KeyDeletingTask(UUID snapshotId) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java index 58697f798b00..6115746711b7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java @@ -138,7 +138,7 @@ private boolean shouldRun() { return !suspended.get() && ozoneManager.isLeaderReady(); } - private class MultipartUploadCleanupTask implements BackgroundTask { + private class MultipartUploadCleanupTask extends BackgroundTask { @Override public int getPriority() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java index 7c4609bb05bd..a6c13372ee75 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java @@ -187,7 +187,7 @@ private boolean shouldRun() { return isServiceStarted && ozoneManager.isLeaderReady(); } - private class RangerBGSyncTask implements BackgroundTask { + private class RangerBGSyncTask extends BackgroundTask { @Override public int getPriority() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java index d4f97b23eeb3..863eae4c507d 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java @@ -160,7 +160,7 @@ private boolean shouldRun() { return !suspended.get() && ozoneManager.isLeaderReady(); } - private class OpenKeyCleanupTask implements BackgroundTask { + private class OpenKeyCleanupTask extends BackgroundTask { private final BucketLayout bucketLayout; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 7cabc1e315a2..2e41c1bfe07c 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -119,7 +119,7 @@ public SnapshotDeletingService(long interval, long serviceTimeout, this.lockIds = new ArrayList<>(2); } - private class SnapshotDeletingTask implements BackgroundTask { + private class SnapshotDeletingTask extends BackgroundTask { @SuppressWarnings("checkstyle:MethodLength") @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java index e73d5ef4253e..102061c2062a 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java @@ -214,7 +214,7 @@ private void removeOlderJobReport() { } } - private class SnapshotDiffCleanUpTask implements BackgroundTask { + private class SnapshotDiffCleanUpTask extends BackgroundTask { @Override public BackgroundTaskResult call() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java index 54ee0a1f9a4b..3cd55c66bbc0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java @@ -513,7 +513,7 @@ int atomicSwitchSnapshotDB(UUID snapshotId, Path checkpointPath) throws IOExcept } } - private final class SnapshotDefragTask implements BackgroundTask { + private final class SnapshotDefragTask extends BackgroundTask { @Override public BackgroundTaskResult call() throws Exception { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java index 06b70dca9054..5335669e7a18 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java @@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.om.service.DirectoryDeletingService.DirDeletingTask; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; @@ -226,8 +227,8 @@ public void testMultithreadedDirectoryDeletion() throws Exception { return future; }); ozoneManager.getKeyManager().getDirDeletingService().suspend(); - DirectoryDeletingService.DirDeletingTask dirDeletingTask = - ozoneManager.getKeyManager().getDirDeletingService().new DirDeletingTask(null); + DirDeletingTask dirDeletingTask = new DirDeletingTask(null, false, + ozoneManager.getKeyManager().getDirDeletingService()); dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 6000); assertThat(futureList).hasSize(threadCount); From 93d51f535f8aa355fb9450d2b041bdce3d6d6e99 Mon Sep 17 00:00:00 2001 From: Swaminathan Balachandran Date: Tue, 2 Dec 2025 10:28:32 -0500 Subject: [PATCH 2/5] HDDS-14020. Fix Scheduler Change-Id: Ie2118356f902443a93fe666d890ad4d59e9dd467 --- .../hadoop/hdds/utils/BackgroundService.java | 182 +++++++++++------- .../utils/BackgroundServiceScheduler.java | 58 ++++++ .../common/BlockDeletingServiceTestImpl.java | 2 +- 3 files changed, 169 insertions(+), 73 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index cb0cacdf6888..7cbc46a58a91 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -24,9 +24,12 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinWorkerThread; import java.util.concurrent.RecursiveAction; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +44,7 @@ public abstract class BackgroundService { protected static final Logger LOG = LoggerFactory.getLogger(BackgroundService.class); // Executor to launch child tasks + private UncheckedAutoCloseableSupplier periodicTaskScheduler; private volatile ForkJoinPool exec; private ThreadGroup threadGroup; private final String serviceName; @@ -49,7 +53,7 @@ public abstract class BackgroundService { private volatile int threadPoolSize; private final String threadNamePrefix; private volatile CompletableFuture future; - private volatile AtomicBoolean isShutdown; + private volatile AtomicReference isShutdown; public BackgroundService(String serviceName, long interval, TimeUnit unit, int threadPoolSize, long serviceTimeout) { @@ -115,7 +119,7 @@ public synchronized void start() { initExecutorAndThreadGroup(); } LOG.info("Starting service {} with interval {} ms", serviceName, intervalInMillis); - exec.execute(new PeriodicalTask(-1)); + exec.execute(new PeriodicalTask(periodicTaskScheduler.get())); } protected void setInterval(long newInterval, TimeUnit newUnit) { @@ -139,88 +143,116 @@ protected void execTaskCompletion() { } * Wait until all tasks to return the result. */ public class PeriodicalTask extends RecursiveAction { - private int numberOfLoops; private final Queue tasksInFlight; - private final AtomicBoolean isShutdown; + private final AtomicReference isShutdown; + private final ScheduledExecutorService scheduledExecuterService; - public PeriodicalTask(int numberOfLoops) { - this.numberOfLoops = numberOfLoops; + public PeriodicalTask(ScheduledExecutorService scheduledExecutorService) { this.tasksInFlight = new LinkedList<>(); this.isShutdown = BackgroundService.this.isShutdown; + this.scheduledExecuterService = scheduledExecutorService; } - private boolean waitForNextInterval() { + private PeriodicalTask(PeriodicalTask other) { + this.tasksInFlight = other.tasksInFlight; + this.isShutdown = other.isShutdown; + this.scheduledExecuterService = other.scheduledExecuterService; + } - if (numberOfLoops > 0) { - numberOfLoops--; - if (numberOfLoops == 0) { - return false; + private boolean performIfNotShutdown(Runnable runnable) { + return isShutdown.updateAndGet((shutdown) -> { + if (!shutdown) { + runnable.run(); } - } - // Check if the executor has been shutdown during task execution. - if (!isShutdown.get()) { - synchronized (BackgroundService.this) { - // Get the shutdown flag again after acquiring the lock. - if (isShutdown.get()) { - return false; - } - try { - BackgroundService.this.wait(intervalInMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while waiting for next interval.", e); - return false; - } + return shutdown; + }); + } + + private boolean performIfNotShutdown(Consumer consumer, T t) { + return isShutdown.updateAndGet((shutdown) -> { + if (!shutdown) { + consumer.accept(t); } - } - return !isShutdown.get(); + return shutdown; + }); } - @Override - public void compute() { + private boolean runTasks() { if (LOG.isDebugEnabled()) { LOG.debug("Running background service : {}", serviceName); } - boolean runAgain = true; - do { - future = new CompletableFuture<>(); - BackgroundTaskQueue tasks = getTasks(true); - if (tasks.isEmpty()) { - // No task found, or some problems to init tasks - // return and retry in next interval. - continue; + if (isShutdown.get()) { + return false; + } + if (!tasksInFlight.isEmpty()) { + LOG.warn("Tasks are still in flight service {}. This should not happen schedule should only begin once all " + + "tasks from schedules have completed execution.", serviceName); + tasksInFlight.clear(); + } + + BackgroundTaskQueue tasks = getTasks(true); + if (tasks.isEmpty()) { + // No task found, or some problems to init tasks + // return and retry in next interval. + return false; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Number of background tasks to execute : {}", tasks.size()); + } + Consumer taskForkHandler = task -> { + task.fork(); + tasksInFlight.offer(task); + }; + while (!tasks.isEmpty()) { + BackgroundTask task = tasks.poll(); + // Fork and submit the task back to executor. + if (performIfNotShutdown(taskForkHandler, task)) { + return false; } - if (LOG.isDebugEnabled()) { - LOG.debug("Number of background tasks to execute : {}", tasks.size()); + } + Consumer taskCompletionHandler = task -> { + BackgroundTask.BackgroundTaskForkResult result = task.join(); + // Check for exception first in the task execution. + if (result.getThrowable() != null) { + LOG.error("Background task execution failed", result.getThrowable()); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("task execution result size {}", result.getResult().getSize()); + } } - - while (!tasks.isEmpty()) { - BackgroundTask task = tasks.poll(); - // Fork and submit the task back to executor. - task.fork(); - tasksInFlight.offer(task); + if (result.getTotalExecutionTime() > serviceTimeoutInNanos) { + LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", + serviceName, result.getTotalExecutionTime(), serviceTimeoutInNanos); + } + }; + while (!tasksInFlight.isEmpty()) { + BackgroundTask taskInFlight = tasksInFlight.poll(); + // Join the tasks forked before and wait for the result one by one. + if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) { + return false; } + } + return true; + } - while (!tasksInFlight.isEmpty()) { - BackgroundTask taskInFlight = tasksInFlight.poll(); - // Join the tasks forked before and wait for the result one by one. - BackgroundTask.BackgroundTaskForkResult result = taskInFlight.join(); - // Check for exception first in the task execution. - if (result.getThrowable() != null) { - LOG.error("Background task execution failed", result.getThrowable()); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("task execution result size {}", result.getResult().getSize()); - } - } - if (result.getTotalExecutionTime() > serviceTimeoutInNanos) { - LOG.warn("{} Background task execution took {}ns > {}ns(timeout)", - serviceName, result.getTotalExecutionTime(), serviceTimeoutInNanos); - } + private void scheduleNextTask() { + performIfNotShutdown(() -> { + if (scheduledExecuterService != null) { + scheduledExecuterService.schedule(() -> exec.submit(new PeriodicalTask(this)), + intervalInMillis, TimeUnit.MILLISECONDS); } - future.complete(null); - runAgain = waitForNextInterval(); - } while (runAgain); + }); + } + + @Override + public void compute() { + future = new CompletableFuture<>(); + if (runTasks()) { + scheduleNextTask(); + } else { + LOG.debug("Service {} is shutdown. Cancelling all schedules of all tasks.", serviceName); + } + future.complete(null); } } @@ -229,17 +261,19 @@ public void shutdown() { LOG.info("Shutting down service {}", this.serviceName); final ThreadGroup threadGroupToBeClosed; final ForkJoinPool execToShutdown; + final UncheckedAutoCloseableSupplier periodicTaskSchedulerToBeClosed; // Set the shutdown flag to true to prevent new tasks from being submitted. synchronized (this) { + periodicTaskSchedulerToBeClosed = periodicTaskScheduler; threadGroupToBeClosed = threadGroup; execToShutdown = exec; exec = null; threadGroup = null; + periodicTaskScheduler = null; if (isShutdown != null) { this.isShutdown.set(true); } isShutdown = null; - this.notify(); } if (execToShutdown != null) { execToShutdown.shutdown(); @@ -252,13 +286,16 @@ public void shutdown() { Thread.currentThread().interrupt(); execToShutdown.shutdownNow(); } - if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) { - threadGroupToBeClosed.destroy(); - } + } + if (periodicTaskSchedulerToBeClosed != null) { + periodicTaskSchedulerToBeClosed.close(); + } + if (threadGroupToBeClosed != null && !threadGroupToBeClosed.isDestroyed()) { + threadGroupToBeClosed.destroy(); } } - private void initExecutorAndThreadGroup() { + private synchronized void initExecutorAndThreadGroup() { try { threadGroup = new ThreadGroup(serviceName); Thread initThread = new Thread(threadGroup, () -> { @@ -271,10 +308,11 @@ private void initExecutorAndThreadGroup() { return thread; }; exec = new ForkJoinPool(threadPoolSize, factory, null, false); - isShutdown = new AtomicBoolean(false); + isShutdown = new AtomicReference<>(false); }); initThread.start(); initThread.join(); + periodicTaskScheduler = BackgroundServiceScheduler.get(); } catch (InterruptedException e) { shutdown(); Thread.currentThread().interrupt(); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java new file mode 100644 index 000000000000..379d936e9d96 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundServiceScheduler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import org.apache.ratis.util.ReferenceCountedObject; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; + +/** + * Utility class to manage a shared background service using a {@link ScheduledExecutorService} + * which is provided with a single-threaded {@link ScheduledThreadPoolExecutor}. + * This class manages the lifecycle and reference counting for the executor + * to ensure proper resource cleanup. + * + * The executor is lazily initialized on the first invocation of the {@code get()} method. + * It is shut down and released when no longer referenced, ensuring efficient use + * of system resources. The shutdown process includes cleaning the reference to the executor. + * + * This class is thread-safe. + */ +final class BackgroundServiceScheduler { + private static ReferenceCountedObject executor; + + private BackgroundServiceScheduler() { + + } + + public static synchronized UncheckedAutoCloseableSupplier get() { + if (executor == null) { + ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1); + executor = ReferenceCountedObject.wrap(scheduler, () -> { }, (shutdown) -> { + if (shutdown) { + synchronized (BackgroundServiceScheduler.class) { + scheduler.shutdown(); + executor = null; + } + } + }); + } + return executor.retainAndReleaseOnClose(); + } +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java index bf8e8d3b627f..73a53e60e381 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/BlockDeletingServiceTestImpl.java @@ -68,7 +68,7 @@ int getTimesOfProcessed() { // Override the implementation to start a single on-call control thread. @Override public void start() { - PeriodicalTask svc = new PeriodicalTask(1); + PeriodicalTask svc = new PeriodicalTask(null); // In test mode, relies on a latch countdown to runDeletingTasks tasks. Runnable r = () -> { while (true) { From d962def7736befa621b2f0c0eac1cee04de10e2e Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:12:51 -0800 Subject: [PATCH 3/5] Implement BackgroundTaskForkJoin to reduce file changed per Sumit's comment: https://github.com/apache/ozone/pull/9390#discussion_r2591852435 Refactor BackgroundTask to use wrapper pattern for ForkJoinPool integration This commit introduces BackgroundTaskForkJoin as a wrapper class to integrate BackgroundTask with ForkJoinPool, avoiding the need to change all service implementations from 'implements' to 'extends'. Key changes: - Reverted BackgroundTask from abstract class back to interface - Created BackgroundTaskForkJoin wrapper extending RecursiveTask - Updated BackgroundService to wrap tasks before forking - Reverted all service task classes to 'implements BackgroundTask' --- .../hadoop/hdds/utils/BackgroundService.java | 15 ++-- .../hadoop/hdds/utils/BackgroundTask.java | 46 +--------- .../hdds/utils/BackgroundTaskForkJoin.java | 87 +++++++++++++++++++ .../hdds/utils/BackgroundTaskResult.java | 1 + .../hadoop/util/TestBackgroundService.java | 4 +- .../background/BlockDeletingTask.java | 16 ++-- ...leRecoveringContainerScrubbingService.java | 6 +- .../scm/block/SCMBlockDeletingService.java | 2 +- .../hadoop/ozone/om/SstFilteringService.java | 2 +- .../service/AbstractKeyDeletingService.java | 6 +- .../ozone/om/service/CompactionService.java | 2 +- .../om/service/DirectoryDeletingService.java | 5 +- .../ozone/om/service/KeyDeletingService.java | 2 +- .../MultipartUploadCleanupService.java | 2 +- .../om/service/OMRangerBGSyncService.java | 2 +- .../om/service/OpenKeyCleanupService.java | 2 +- .../om/service/SnapshotDeletingService.java | 2 +- .../service/SnapshotDiffCleanupService.java | 2 +- .../defrag/SnapshotDefragService.java | 2 +- 19 files changed, 125 insertions(+), 81 deletions(-) create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java index 7cbc46a58a91..2cb8db3c655e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java @@ -143,7 +143,7 @@ protected void execTaskCompletion() { } * Wait until all tasks to return the result. */ public class PeriodicalTask extends RecursiveAction { - private final Queue tasksInFlight; + private final Queue tasksInFlight; private final AtomicReference isShutdown; private final ScheduledExecutorService scheduledExecuterService; @@ -199,19 +199,20 @@ private boolean runTasks() { if (LOG.isDebugEnabled()) { LOG.debug("Number of background tasks to execute : {}", tasks.size()); } - Consumer taskForkHandler = task -> { + Consumer taskForkHandler = task -> { task.fork(); tasksInFlight.offer(task); }; while (!tasks.isEmpty()) { BackgroundTask task = tasks.poll(); - // Fork and submit the task back to executor. - if (performIfNotShutdown(taskForkHandler, task)) { + // Wrap the task in a ForkJoin wrapper and fork it. + BackgroundTaskForkJoin forkJoinTask = new BackgroundTaskForkJoin(task); + if (performIfNotShutdown(taskForkHandler, forkJoinTask)) { return false; } } - Consumer taskCompletionHandler = task -> { - BackgroundTask.BackgroundTaskForkResult result = task.join(); + Consumer taskCompletionHandler = task -> { + BackgroundTaskForkJoin.BackgroundTaskForkResult result = task.join(); // Check for exception first in the task execution. if (result.getThrowable() != null) { LOG.error("Background task execution failed", result.getThrowable()); @@ -226,7 +227,7 @@ private boolean runTasks() { } }; while (!tasksInFlight.isEmpty()) { - BackgroundTask taskInFlight = tasksInFlight.poll(); + BackgroundTaskForkJoin taskInFlight = tasksInFlight.poll(); // Join the tasks forked before and wait for the result one by one. if (performIfNotShutdown(taskCompletionHandler, taskInFlight)) { return false; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java index 8ef18c94cc3b..0717087a8655 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTask.java @@ -18,56 +18,16 @@ package org.apache.hadoop.hdds.utils; import java.util.concurrent.Callable; -import java.util.concurrent.RecursiveTask; /** * A task thread to run by {@link BackgroundService}. */ -public abstract class BackgroundTask extends RecursiveTask - implements Callable { - private static final long serialVersionUID = 1L; - - static final class BackgroundTaskForkResult { - private final BackgroundTaskResult result; - private final Throwable throwable; - private final long startTime; - private final long endTime; - - private BackgroundTaskForkResult(BackgroundTaskResult result, long startTime, long endTime, Throwable throwable) { - this.endTime = endTime; - this.result = result; - this.startTime = startTime; - this.throwable = throwable; - } - - public long getTotalExecutionTime() { - return endTime - startTime; - } - - public BackgroundTaskResult getResult() { - return result; - } - - public Throwable getThrowable() { - return throwable; - } - } +public interface BackgroundTask extends Callable { @Override - protected final BackgroundTaskForkResult compute() { - long startTime = System.nanoTime(); - BackgroundTaskResult result = null; - Throwable throwable = null; - try { - result = this.call(); - } catch (Throwable e) { - throwable = e; - } - long endTime = System.nanoTime(); - return new BackgroundTaskForkResult(result, startTime, endTime, throwable); - } + BackgroundTaskResult call() throws Exception; - public int getPriority() { + default int getPriority() { return 0; } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java new file mode 100644 index 000000000000..fe8b3e64ba62 --- /dev/null +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import java.util.concurrent.RecursiveTask; + +/** + * A ForkJoin wrapper for {@link BackgroundTask} that enables parallel execution + * in a ForkJoinPool while keeping the BackgroundTask interface simple. + * + *

This wrapper handles the RecursiveTask mechanics, timing, and exception + * handling, allowing BackgroundTask implementations to focus on their business logic. + */ +public class BackgroundTaskForkJoin extends RecursiveTask { + private static final long serialVersionUID = 1L; + private final transient BackgroundTask backgroundTask; + + public BackgroundTaskForkJoin(BackgroundTask backgroundTask) { + this.backgroundTask = backgroundTask; + } + + /** + * Result wrapper containing the task result, execution time, and any exception. + */ + public static final class BackgroundTaskForkResult { + private final BackgroundTaskResult result; + private final Throwable throwable; + private final long startTime; + private final long endTime; + + private BackgroundTaskForkResult(BackgroundTaskResult result, long startTime, long endTime, Throwable throwable) { + this.endTime = endTime; + this.result = result; + this.startTime = startTime; + this.throwable = throwable; + } + + public long getTotalExecutionTime() { + return endTime - startTime; + } + + public BackgroundTaskResult getResult() { + return result; + } + + public Throwable getThrowable() { + return throwable; + } + } + + @Override + protected BackgroundTaskForkResult compute() { + long startTime = System.nanoTime(); + BackgroundTaskResult result = null; + Throwable throwable = null; + try { + result = backgroundTask.call(); + } catch (Throwable e) { + throwable = e; + } + long endTime = System.nanoTime(); + return new BackgroundTaskForkResult(result, startTime, endTime, throwable); + } + + public int getPriority() { + return backgroundTask.getPriority(); + } + + public BackgroundTask getBackgroundTask() { + return backgroundTask; + } +} diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java index 047f3b16fe72..e7083a45c061 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskResult.java @@ -21,6 +21,7 @@ * Result of a {@link BackgroundTask}. */ public interface BackgroundTaskResult { + /** * Returns the size of entries included in this result. */ diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java index 3bdc2adbf020..2c5ed1bfea5a 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/util/TestBackgroundService.java @@ -52,7 +52,7 @@ public class TestBackgroundService { private BackgroundService backgroundService; private AtomicInteger runCount; - private static class TestTask extends BackgroundTask { + private static class TestTask implements BackgroundTask { private Map map; private Map locks; private int index; @@ -91,7 +91,7 @@ public void shutDown() { } private BackgroundService createBackgroundService(String name, int interval, - TimeUnit unit, int workerThreads, int serviceTimeout) throws InterruptedException { + TimeUnit unit, int workerThreads, int serviceTimeout) { return new BackgroundService(name, interval, unit, workerThreads, serviceTimeout) { @Override public BackgroundTaskQueue getTasks() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java index e382f0ad844d..1f66cad476f7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingTask.java @@ -60,19 +60,19 @@ /** * BlockDeletingTask for KeyValueContainer. */ -public class BlockDeletingTask extends BackgroundTask { +public class BlockDeletingTask implements BackgroundTask { - private static final long serialVersionUID = 1L; - private static final Logger LOG = LoggerFactory.getLogger(BlockDeletingTask.class); + private static final Logger LOG = + LoggerFactory.getLogger(BlockDeletingTask.class); - private final transient BlockDeletingServiceMetrics metrics; + private final BlockDeletingServiceMetrics metrics; private final int priority; - private final transient KeyValueContainerData containerData; + private final KeyValueContainerData containerData; private long blocksToDelete; - private final transient OzoneContainer ozoneContainer; - private final transient ConfigurationSource conf; + private final OzoneContainer ozoneContainer; + private final ConfigurationSource conf; private Duration blockDeletingMaxLockHoldingTime; - private final transient ContainerChecksumTreeManager checksumTreeManager; + private final ContainerChecksumTreeManager checksumTreeManager; public BlockDeletingTask( BlockDeletingService blockDeletingService, diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java index 51049ee22bbd..5535c5128ccc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java @@ -70,10 +70,8 @@ public BackgroundTaskQueue getTasks() { return backgroundTaskQueue; } - static class RecoveringContainerScrubbingTask extends BackgroundTask { - private static final long serialVersionUID = 1L; - - private final transient ContainerSet containerSet; + static class RecoveringContainerScrubbingTask implements BackgroundTask { + private final ContainerSet containerSet; private final long containerID; RecoveringContainerScrubbingTask( diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java index 473cd3257b62..348e2e9d5dcb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java @@ -139,7 +139,7 @@ public BackgroundTaskQueue getTasks() { return queue; } - private class DeletedBlockTransactionScanner extends BackgroundTask { + private class DeletedBlockTransactionScanner implements BackgroundTask { @Override public int getPriority() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java index 389d459be4b7..1bd529539623 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java @@ -126,7 +126,7 @@ public void resume() { running.set(true); } - private class SstFilteringTask extends BackgroundTask { + private class SstFilteringTask implements BackgroundTask { private boolean isSnapshotDeleted(SnapshotInfo snapshotInfo) { return snapshotInfo == null || snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java index 887175077033..000723e1c401 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/AbstractKeyDeletingService.java @@ -106,10 +106,8 @@ boolean isPreviousPurgeTransactionFlushed() throws IOException { return true; } - private static final class BackgroundDeleteTask extends BackgroundTask { - private static final long serialVersionUID = 1L; - - private final transient BootstrapStateHandler.Lock bootstrapLock; + private static final class BackgroundDeleteTask implements BackgroundTask { + private final BootstrapStateHandler.Lock bootstrapLock; private final BackgroundTask task; private BackgroundDeleteTask(BootstrapStateHandler.Lock bootstrapLock, BackgroundTask task) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java index 786f703ccdab..dfcc32578adf 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/CompactionService.java @@ -155,7 +155,7 @@ protected void compactFully(String tableName) throws IOException { } } - private class CompactTask extends BackgroundTask { + private class CompactTask implements BackgroundTask { private final String tableName; CompactTask(String tableName) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 371b6442257d..209741e7d0c0 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -518,12 +518,11 @@ OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey, } @VisibleForTesting - static final class DirDeletingTask extends BackgroundTask { - private static final long serialVersionUID = 1L; + static final class DirDeletingTask implements BackgroundTask { private final UUID snapshotId; private final boolean allowForks; - private final transient DirectoryDeletingService dds; + private final DirectoryDeletingService dds; DirDeletingTask(UUID snapshotId, boolean allowForks, DirectoryDeletingService dds) { this.snapshotId = snapshotId; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java index 4821ddad79d7..75019adf7ec5 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyDeletingService.java @@ -500,7 +500,7 @@ public void setKeyLimitPerTask(int keyLimitPerTask) { * DB. */ @VisibleForTesting - final class KeyDeletingTask extends BackgroundTask { + final class KeyDeletingTask implements BackgroundTask { private final UUID snapshotId; KeyDeletingTask(UUID snapshotId) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java index 6115746711b7..58697f798b00 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/MultipartUploadCleanupService.java @@ -138,7 +138,7 @@ private boolean shouldRun() { return !suspended.get() && ozoneManager.isLeaderReady(); } - private class MultipartUploadCleanupTask extends BackgroundTask { + private class MultipartUploadCleanupTask implements BackgroundTask { @Override public int getPriority() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java index a6c13372ee75..7c4609bb05bd 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OMRangerBGSyncService.java @@ -187,7 +187,7 @@ private boolean shouldRun() { return isServiceStarted && ozoneManager.isLeaderReady(); } - private class RangerBGSyncTask extends BackgroundTask { + private class RangerBGSyncTask implements BackgroundTask { @Override public int getPriority() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java index 863eae4c507d..d4f97b23eeb3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/OpenKeyCleanupService.java @@ -160,7 +160,7 @@ private boolean shouldRun() { return !suspended.get() && ozoneManager.isLeaderReady(); } - private class OpenKeyCleanupTask extends BackgroundTask { + private class OpenKeyCleanupTask implements BackgroundTask { private final BucketLayout bucketLayout; diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java index 2e41c1bfe07c..7cabc1e315a2 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDeletingService.java @@ -119,7 +119,7 @@ public SnapshotDeletingService(long interval, long serviceTimeout, this.lockIds = new ArrayList<>(2); } - private class SnapshotDeletingTask extends BackgroundTask { + private class SnapshotDeletingTask implements BackgroundTask { @SuppressWarnings("checkstyle:MethodLength") @Override diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java index 102061c2062a..e73d5ef4253e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/SnapshotDiffCleanupService.java @@ -214,7 +214,7 @@ private void removeOlderJobReport() { } } - private class SnapshotDiffCleanUpTask extends BackgroundTask { + private class SnapshotDiffCleanUpTask implements BackgroundTask { @Override public BackgroundTaskResult call() { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java index 3cd55c66bbc0..54ee0a1f9a4b 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java @@ -513,7 +513,7 @@ int atomicSwitchSnapshotDB(UUID snapshotId, Path checkpointPath) throws IOExcept } } - private final class SnapshotDefragTask extends BackgroundTask { + private final class SnapshotDefragTask implements BackgroundTask { @Override public BackgroundTaskResult call() throws Exception { From 5c44c5bd15cccc61aff90b76f3e7a97f1d9e5e30 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:47:30 -0800 Subject: [PATCH 4/5] Move BackgroundTaskForkJoin to framework module --- .../java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename hadoop-hdds/{common => framework}/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java (100%) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java similarity index 100% rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java rename to hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/BackgroundTaskForkJoin.java From 529a9649d3cd08af2ea7ac1ba51023127d79be66 Mon Sep 17 00:00:00 2001 From: Siyao Meng <50227127+smengcl@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:59:09 -0800 Subject: [PATCH 5/5] ForkJoinPool handling in DiskBalancerService --- .../container/diskbalancer/DiskBalancerService.java | 13 +++++++++---- .../diskbalancer/DiskBalancerServiceTestImpl.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java index aaa14321011e..8652924b52e2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -258,11 +259,15 @@ private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo) setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven()); setVersion(diskBalancerInfo.getVersion()); - // Default executorService is ScheduledThreadPoolExecutor, so we can - // update the poll size by setting corePoolSize. - if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) { - ((ScheduledThreadPoolExecutor) getExecutorService()) + Object executorService = getExecutorService(); + if (executorService instanceof ScheduledThreadPoolExecutor) { + // Update the pool size by setting corePoolSize for ScheduledThreadPoolExecutor + ((ScheduledThreadPoolExecutor) executorService) .setCorePoolSize(parallelThread); + } else if (executorService instanceof ForkJoinPool) { + // For ForkJoinPool, dynamic resizing is not supported and requires service restart + LOG.warn("ForkJoinPool doesn't support dynamic pool size changes. " + + "Service restart is required for pool size change to take effect."); } } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java index ef1503219070..c3746a1ba13b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerServiceTestImpl.java @@ -65,7 +65,7 @@ public int getTimesOfProcessed() { // Override the implementation to start a single on-call control thread. @Override public void start() { - PeriodicalTask svc = new PeriodicalTask(); + PeriodicalTask svc = new PeriodicalTask(null); // In test mode, relies on a latch countdown to runDeletingTasks tasks. Runnable r = () -> { while (true) {