001 /*
002 * Java Genetic Algorithm Library (jenetics-1.6.0).
003 * Copyright (c) 2007-2014 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmx.at)
019 */
020 package org.jenetics.util;
021
022 import static java.util.Objects.requireNonNull;
023 import static java.util.concurrent.ForkJoinTask.adapt;
024
025 import java.util.ArrayList;
026 import java.util.List;
027 import java.util.concurrent.Executor;
028 import java.util.concurrent.ForkJoinPool;
029 import java.util.concurrent.ForkJoinTask;
030
031 import javolution.context.LocalContext;
032
033 /**
034 * [code]
035 * try (final Concurrent c = new Concurrent()) {
036 * c.execute(task1);
037 * c.execute(task2);
038 * }
039 * [/code]
040 *
041 * @author <a href="mailto:franz.wilhelmstoetter@gmx.at">Franz Wilhelmstötter</a>
042 * @since 1.5
043 * @version 1.5 — <em>$Date: 2014-02-15 $</em>
044 */
045 @Deprecated
046 final class Concurrent implements Executor, AutoCloseable {
047
048 /* ************************************************************************
049 * Static concurrent context.
050 * ************************************************************************/
051
052 private static final Object NULL = new Object();
053
054 private static final LocalContext.Reference<Object>
055 FORK_JOIN_POOL = new LocalContext.Reference<Object>(new ForkJoinPool(
056 Runtime.getRuntime().availableProcessors()
057 ));
058
059 /**
060 * Set the thread pool to use for concurrent actions. If the given pool is
061 * {@code null}, the command, given in the {@link #execute(Runnable)} method
062 * is executed in the main thread.
063 *
064 * @param pool the thread pool to use.
065 */
066 public static void setForkJoinPool(final ForkJoinPool pool) {
067 FORK_JOIN_POOL.set(pool != null ? pool : NULL);
068 }
069
070 /**
071 * Return the currently use thread pool.
072 *
073 * @return the currently used thread pool.
074 */
075 public static ForkJoinPool getForkJoinPool() {
076 final Object pool = FORK_JOIN_POOL.get();
077 return pool != NULL ? (ForkJoinPool)pool : null;
078 }
079
080
081 /* ************************************************************************
082 * 'Dynamic' concurrent context.
083 * ************************************************************************/
084
085 private final int TASKS_SIZE = 15;
086
087 private final ForkJoinPool _pool;
088 private final List<ForkJoinTask<?>> _tasks = new ArrayList<>(TASKS_SIZE);
089 private final boolean _parallel;
090
091 /**
092 * Create a new {@code Concurrent} executor <i>context</i> with the given
093 * {@link ForkJoinPool}.
094 *
095 * @param pool the {@code ForkJoinPool} used for concurrent execution of the
096 * given tasks. The {@code pool} may be {@code null} and if so, the
097 * given tasks are executed in the main thread.
098 */
099 private Concurrent(final ForkJoinPool pool) {
100 _pool = pool;
101 _parallel = _pool != null;
102 }
103
104 /**
105 * Create a new {@code Concurrent} executor <i>context</i> with the
106 * {@code ForkJoinPool} set with the {@link #setForkJoinPool(ForkJoinPool)},
107 * or the default pool, if no one has been set.
108 */
109 public Concurrent() {
110 this(getForkJoinPool());
111 }
112
113 /**
114 * Return the current <i>parallelism</i> of this {@code Concurrent} object.
115 *
116 * @return the current <i>parallelism</i> of this {@code Concurrent} object
117 */
118 public int getParallelism() {
119 return _pool != null ? _pool.getParallelism() : 1;
120 }
121
122 @Override
123 public void execute(final Runnable command) {
124 if (_parallel) {
125 final ForkJoinTask<?> task = toForkJoinTask(command);
126 _pool.execute(task);
127 _tasks.add(task);
128 } else {
129 command.run();
130 }
131 }
132
133 private static ForkJoinTask<?> toForkJoinTask(final Runnable r) {
134 return r instanceof ForkJoinTask<?> ? (ForkJoinTask<?>)r : adapt(r);
135 }
136
137 /**
138 * Executes the given {@code runnables} in {@code n} parts.
139 *
140 * @param n the number of parts the given {@code runnables} are executed.
141 * @param runnables the runnables to be executed.
142 * @throws NullPointerException if the given runnables are {@code null}.
143 */
144 public void execute(final int n, final List<? extends Runnable> runnables) {
145 requireNonNull(runnables, "Runnables must not be null");
146 if (runnables.size() > 0) {
147 final int[] parts = arrays.partition(runnables.size(), n);
148
149 for (int i = 0; i < parts.length - 1; ++i) {
150 final int part = i;
151
152 execute(new Runnable() { @Override public void run() {
153 for (int j = parts[part]; j < parts[part + 1]; ++j) {
154 runnables.get(j).run();
155 }
156 }});
157 }
158 }
159 }
160
161 /**
162 * Executes the given {@code runnables} in {@link #getParallelism()} parts.
163 *
164 * @param runnables the runnables to be executed.
165 * @throws NullPointerException if the given runnables are {@code null}.
166 */
167 public void execute(final List<? extends Runnable> runnables) {
168 execute(getParallelism(), runnables);
169 }
170
171 /**
172 * Executes the given {@code runnables} in {@code n} parts.
173 *
174 * @param n the number of parts the given {@code runnables} are executed.
175 * @param runnables the runnables to be executed.
176 * @throws NullPointerException if the given runnables are {@code null}.
177 */
178 public void execute(final int n, final Runnable... runnables) {
179 requireNonNull(runnables, "Runnables must not be null");
180 if (runnables.length > 0) {
181 final int[] parts = arrays.partition(runnables.length, n);
182
183 for (int i = 0; i < parts.length - 1; ++i) {
184 final int part = i;
185
186 execute(new Runnable() { @Override public void run() {
187 for (int j = parts[part]; j < parts[part + 1]; ++j) {
188 runnables[j].run();
189 }
190 }});
191 }
192 }
193 }
194
195 /**
196 * Executes the given {@code runnables} in {@link #getParallelism()} parts.
197 *
198 * @param runnables the runnables to be executed.
199 * @throws NullPointerException if the given runnables are {@code null}.
200 */
201 public void execute(final Runnable... runnables) {
202 execute(getParallelism(), runnables);
203 }
204
205 @Override
206 public void close() {
207 if (_parallel) {
208 for (int i = _tasks.size(); --i >= 0;) {
209 _tasks.get(i).join();
210 }
211 }
212 }
213
214 }
|