Concurrent.java
/*
* Java Genetic Algorithm Library (@__identifier__@).
* Copyright (c) @__year__@ Franz Wilhelmstötter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Author:
* Franz Wilhelmstötter (franz.wilhelmstoetter@gmx.at)
*/
package org.jenetics.util;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.ForkJoinTask.adapt;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import javolution.context.LocalContext;
/**
* [code]
* try (final Concurrent c = new Concurrent()) {
* c.execute(task1);
* c.execute(task2);
* }
* [/code]
*
* @author <a href="mailto:franz.wilhelmstoetter@gmx.at">Franz Wilhelmstötter</a>
* @since 1.5
* @version 1.5 — <em>$Date: 2014-02-15 $</em>
*/
@Deprecated
final class Concurrent implements Executor, AutoCloseable {
/* ************************************************************************
* Static concurrent context.
* ************************************************************************/
private static final Object NULL = new Object();
private static final LocalContext.Reference<Object>
FORK_JOIN_POOL = new LocalContext.Reference<Object>(new ForkJoinPool(
Runtime.getRuntime().availableProcessors()
));
/**
* Set the thread pool to use for concurrent actions. If the given pool is
* {@code null}, the command, given in the {@link #execute(Runnable)} method
* is executed in the main thread.
*
* @param pool the thread pool to use.
*/
public static void setForkJoinPool(final ForkJoinPool pool) {
FORK_JOIN_POOL.set(pool != null ? pool : NULL);
}
/**
* Return the currently use thread pool.
*
* @return the currently used thread pool.
*/
public static ForkJoinPool getForkJoinPool() {
final Object pool = FORK_JOIN_POOL.get();
return pool != NULL ? (ForkJoinPool)pool : null;
}
/* ************************************************************************
* 'Dynamic' concurrent context.
* ************************************************************************/
private final int TASKS_SIZE = 15;
private final ForkJoinPool _pool;
private final List<ForkJoinTask<?>> _tasks = new ArrayList<>(TASKS_SIZE);
private final boolean _parallel;
/**
* Create a new {@code Concurrent} executor <i>context</i> with the given
* {@link ForkJoinPool}.
*
* @param pool the {@code ForkJoinPool} used for concurrent execution of the
* given tasks. The {@code pool} may be {@code null} and if so, the
* given tasks are executed in the main thread.
*/
private Concurrent(final ForkJoinPool pool) {
_pool = pool;
_parallel = _pool != null;
}
/**
* Create a new {@code Concurrent} executor <i>context</i> with the
* {@code ForkJoinPool} set with the {@link #setForkJoinPool(ForkJoinPool)},
* or the default pool, if no one has been set.
*/
public Concurrent() {
this(getForkJoinPool());
}
/**
* Return the current <i>parallelism</i> of this {@code Concurrent} object.
*
* @return the current <i>parallelism</i> of this {@code Concurrent} object
*/
public int getParallelism() {
return _pool != null ? _pool.getParallelism() : 1;
}
@Override
public void execute(final Runnable command) {
if (_parallel) {
final ForkJoinTask<?> task = toForkJoinTask(command);
_pool.execute(task);
_tasks.add(task);
} else {
command.run();
}
}
private static ForkJoinTask<?> toForkJoinTask(final Runnable r) {
return r instanceof ForkJoinTask<?> ? (ForkJoinTask<?>)r : adapt(r);
}
/**
* Executes the given {@code runnables} in {@code n} parts.
*
* @param n the number of parts the given {@code runnables} are executed.
* @param runnables the runnables to be executed.
* @throws NullPointerException if the given runnables are {@code null}.
*/
public void execute(final int n, final List<? extends Runnable> runnables) {
requireNonNull(runnables, "Runnables must not be null");
if (runnables.size() > 0) {
final int[] parts = arrays.partition(runnables.size(), n);
for (int i = 0; i < parts.length - 1; ++i) {
final int part = i;
execute(new Runnable() { @Override public void run() {
for (int j = parts[part]; j < parts[part + 1]; ++j) {
runnables.get(j).run();
}
}});
}
}
}
/**
* Executes the given {@code runnables} in {@link #getParallelism()} parts.
*
* @param runnables the runnables to be executed.
* @throws NullPointerException if the given runnables are {@code null}.
*/
public void execute(final List<? extends Runnable> runnables) {
execute(getParallelism(), runnables);
}
/**
* Executes the given {@code runnables} in {@code n} parts.
*
* @param n the number of parts the given {@code runnables} are executed.
* @param runnables the runnables to be executed.
* @throws NullPointerException if the given runnables are {@code null}.
*/
public void execute(final int n, final Runnable... runnables) {
requireNonNull(runnables, "Runnables must not be null");
if (runnables.length > 0) {
final int[] parts = arrays.partition(runnables.length, n);
for (int i = 0; i < parts.length - 1; ++i) {
final int part = i;
execute(new Runnable() { @Override public void run() {
for (int j = parts[part]; j < parts[part + 1]; ++j) {
runnables[j].run();
}
}});
}
}
}
/**
* Executes the given {@code runnables} in {@link #getParallelism()} parts.
*
* @param runnables the runnables to be executed.
* @throws NullPointerException if the given runnables are {@code null}.
*/
public void execute(final Runnable... runnables) {
execute(getParallelism(), runnables);
}
@Override
public void close() {
if (_parallel) {
for (int i = _tasks.size(); --i >= 0;) {
_tasks.get(i).join();
}
}
}
}