如果之前提交过任务,该如何拒绝呢?
My code is here. This is a related question where I asked about rejecting tasks based on back-pressure.
Now, I have a CalculationBroker
which accepts CalculationTask
s.
public class CalculationBroker {
private static final Logger LOGGER = LoggerFactory.getLogger(CalculationBroker.class);
private static final int WORKERS_NUMBER = 10;
private static final int SUBMITTED_TASKS_QUEUE_SIZE = 20;
private final ExecutorService executorService = initializeThreadPoolWithRejection();
private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>();
public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) {
final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
if (calculationResultCached != null) {
return CompletableFuture.completedFuture(calculationResultCached);
}
LOGGER.info("Calculation submitted: {}.", calculationTask.getName());
try {
final CompletableFuture<CalculationResult> calculated = CompletableFuture
.supplyAsync(calculationTask::calculate, executorService);
calculated.thenAccept(this::updateCache);
return calculated;
} catch (Exception e) {
System.out.println("Failed to submit a task.");
return CompletableFuture.failedFuture(e);
}
}
private void updateCache(final CalculationResult calculationResult) {
calculationCache.put(calculationResult.getTaskName(), calculationResult);
}
private ExecutorService initializeThreadPoolWithRejection() {
final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
return new ThreadPoolExecutor(WORKERS_NUMBER, WORKERS_NUMBER,
0L, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(SUBMITTED_TASKS_QUEUE_SIZE),
handler);
}
}
Same task or duplicate is a CalculationTask
with same name
. If such task was submitted (not finished yet), I would like to reject it next time. To do so, I need a sort of cache where I keep names of submitted tasks, and if a task is present in the cache, I have to reject it.