Concurrency.java
/*
* Java Genetic Algorithm Library (@__identifier__@).
* Copyright (c) @__year__@ Franz Wilhelmstötter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Author:
* Franz Wilhelmstötter (franz.wilhelmstoetter@gmx.at)
*/
package org.jenetics.internal.util;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
import static org.jenetics.util.arrays.partition;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
/**
* @author <a href="mailto:franz.wilhelmstoetter@gmx.at">Franz Wilhelmstötter</a>
* @version 2.0 — <em>$Date: 2014-04-05 $</em>
* @since 2.0
*/
public abstract class Concurrency implements Executor, AutoCloseable {
public static final int CORES = Runtime.getRuntime().availableProcessors();
public static final Concurrency SERIAL_EXECUTOR = new SerialConcurrency();
private static final class LazyPoolHolder {
public static final ForkJoinPool FORK_JOIN_POOL =
new ForkJoinPool(max(CORES - 1, 1));
}
public static ForkJoinPool commonPool() {
return LazyPoolHolder.FORK_JOIN_POOL;
}
public abstract void execute(final List<? extends Runnable> runnables);
@Override
public abstract void close();
/**
* Return an new Concurrency object from the given executor.
*
* @param executor the underlying Executor
* @return a new Concurrency object
*/
public static Concurrency with(final Executor executor) {
if (executor instanceof ForkJoinPool) {
return new ForkJoinPoolConcurrency((ForkJoinPool)executor);
} else if (executor instanceof ExecutorService) {
return new ExecutorServiceConcurrency((ExecutorService)executor);
} else if (executor == SERIAL_EXECUTOR) {
return SERIAL_EXECUTOR;
} else {
return new ExecutorConcurrency(executor);
}
}
/**
* Return a new Concurrency object using the common ForkJoinPool.
*
* @return a new Concurrency object using the new ForkJoinPool
*/
public static Concurrency withCommonPool() {
return with(commonPool());
}
/**
* This Concurrency uses a ForkJoinPool.
*/
private static final class ForkJoinPoolConcurrency extends Concurrency {
private final Stack<ForkJoinTask<?>> _tasks = new Stack<>();
private final ForkJoinPool _pool;
ForkJoinPoolConcurrency(final ForkJoinPool pool) {
_pool = requireNonNull(pool);
}
@Override
public void execute(final Runnable runnable) {
_tasks.push(_pool.submit(runnable));
}
@Override
public void execute(final List<? extends Runnable> runnables) {
_tasks.push(_pool.submit(new RunnablesAction(runnables)));
}
@Override
public void close() {
for (ForkJoinTask<?> t = _tasks.pop(); t != null; t = _tasks.pop()) {
t.join();
}
}
}
/**
* This Concurrency uses an ExecutorService.
*/
private static final class ExecutorServiceConcurrency extends Concurrency {
private final Stack<Future<?>> _futures = new Stack<>();
private final ExecutorService _service;
ExecutorServiceConcurrency(final ExecutorService service) {
_service = requireNonNull(service);
}
@Override
public void execute(final Runnable command) {
_futures.push(_service.submit(command));
}
@Override
public void execute(final List<? extends Runnable> runnables) {
final int[] parts = partition(runnables.size(), CORES + 1);
for (int i = 0; i < parts.length - 1; ++i) {
execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
}
}
@Override
public void close() {
try {
for (Future<?> f = _futures.pop(); f != null; f = _futures.pop()) {
f.get();
}
} catch (InterruptedException|ExecutionException e) {
throw new CancellationException(e.getMessage());
}
}
}
/**
* This Concurrency uses an Executor.
*/
private static final class ExecutorConcurrency extends Concurrency {
private final Stack<FutureTask<?>> _tasks = new Stack<>();
private final Executor _executor;
ExecutorConcurrency(final Executor executor) {
_executor = requireNonNull(executor);
}
@Override
public void execute(final Runnable command) {
final FutureTask<?> task = new FutureTask<>(command, null);
_tasks.push(task);
_executor.execute(task);
}
@Override
public void execute(final List<? extends Runnable> runnables) {
final int[] parts = partition(runnables.size(), CORES + 1);
for (int i = 0; i < parts.length - 1; ++i) {
execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
}
}
@Override
public void close() {
try {
for (FutureTask<?> t = _tasks.pop(); t != null; t = _tasks.pop()) {
t.get();
}
} catch (InterruptedException|ExecutionException e) {
throw new CancellationException(e.getMessage());
}
}
}
/**
* This Concurrency executes the runnables within the main thread.
*/
private static final class SerialConcurrency extends Concurrency {
@Override
public void execute(final Runnable command) {
command.run();
}
@Override
public void execute(final List<? extends Runnable> runnables) {
for (final Runnable runnable : runnables) {
runnable.run();
}
}
@Override
public void close() {
}
}
}