001 /*
002 * Java Genetic Algorithm Library (jenetics-2.0.2).
003 * Copyright (c) 2007-2014 Franz Wilhelmstötter
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 * Author:
018 * Franz Wilhelmstötter (franz.wilhelmstoetter@gmx.at)
019 */
020 package org.jenetics.internal.util;
021
022 import static java.lang.Math.max;
023 import static java.util.Objects.requireNonNull;
024 import static org.jenetics.util.arrays.partition;
025
026 import java.util.List;
027 import java.util.concurrent.CancellationException;
028 import java.util.concurrent.ExecutionException;
029 import java.util.concurrent.Executor;
030 import java.util.concurrent.ExecutorService;
031 import java.util.concurrent.ForkJoinPool;
032 import java.util.concurrent.ForkJoinTask;
033 import java.util.concurrent.Future;
034 import java.util.concurrent.FutureTask;
035
036 /**
037 * @author <a href="mailto:franz.wilhelmstoetter@gmx.at">Franz Wilhelmstötter</a>
038 * @version 2.0 — <em>$Date: 2014-04-05 $</em>
039 * @since 2.0
040 */
041 public abstract class Concurrency implements Executor, AutoCloseable {
042
043 public static final int CORES = Runtime.getRuntime().availableProcessors();
044
045 public static final Concurrency SERIAL_EXECUTOR = new SerialConcurrency();
046
047 private static final class LazyPoolHolder {
048 public static final ForkJoinPool FORK_JOIN_POOL =
049 new ForkJoinPool(max(CORES - 1, 1));
050 }
051
052 public static ForkJoinPool commonPool() {
053 return LazyPoolHolder.FORK_JOIN_POOL;
054 }
055
056 public abstract void execute(final List<? extends Runnable> runnables);
057
058 @Override
059 public abstract void close();
060
061 /**
062 * Return an new Concurrency object from the given executor.
063 *
064 * @param executor the underlying Executor
065 * @return a new Concurrency object
066 */
067 public static Concurrency with(final Executor executor) {
068 if (executor instanceof ForkJoinPool) {
069 return new ForkJoinPoolConcurrency((ForkJoinPool)executor);
070 } else if (executor instanceof ExecutorService) {
071 return new ExecutorServiceConcurrency((ExecutorService)executor);
072 } else if (executor == SERIAL_EXECUTOR) {
073 return SERIAL_EXECUTOR;
074 } else {
075 return new ExecutorConcurrency(executor);
076 }
077 }
078
079 /**
080 * Return a new Concurrency object using the common ForkJoinPool.
081 *
082 * @return a new Concurrency object using the new ForkJoinPool
083 */
084 public static Concurrency withCommonPool() {
085 return with(commonPool());
086 }
087
088
089 /**
090 * This Concurrency uses a ForkJoinPool.
091 */
092 private static final class ForkJoinPoolConcurrency extends Concurrency {
093 private final Stack<ForkJoinTask<?>> _tasks = new Stack<>();
094 private final ForkJoinPool _pool;
095
096 ForkJoinPoolConcurrency(final ForkJoinPool pool) {
097 _pool = requireNonNull(pool);
098 }
099
100 @Override
101 public void execute(final Runnable runnable) {
102 _tasks.push(_pool.submit(runnable));
103 }
104
105 @Override
106 public void execute(final List<? extends Runnable> runnables) {
107 _tasks.push(_pool.submit(new RunnablesAction(runnables)));
108 }
109
110 @Override
111 public void close() {
112 for (ForkJoinTask<?> t = _tasks.pop(); t != null; t = _tasks.pop()) {
113 t.join();
114 }
115 }
116 }
117
118 /**
119 * This Concurrency uses an ExecutorService.
120 */
121 private static final class ExecutorServiceConcurrency extends Concurrency {
122 private final Stack<Future<?>> _futures = new Stack<>();
123 private final ExecutorService _service;
124
125 ExecutorServiceConcurrency(final ExecutorService service) {
126 _service = requireNonNull(service);
127 }
128
129 @Override
130 public void execute(final Runnable command) {
131 _futures.push(_service.submit(command));
132 }
133
134 @Override
135 public void execute(final List<? extends Runnable> runnables) {
136 final int[] parts = partition(runnables.size(), CORES + 1);
137 for (int i = 0; i < parts.length - 1; ++i) {
138 execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
139 }
140 }
141
142 @Override
143 public void close() {
144 try {
145 for (Future<?> f = _futures.pop(); f != null; f = _futures.pop()) {
146 f.get();
147 }
148 } catch (InterruptedException|ExecutionException e) {
149 throw new CancellationException(e.getMessage());
150 }
151 }
152 }
153
154 /**
155 * This Concurrency uses an Executor.
156 */
157 private static final class ExecutorConcurrency extends Concurrency {
158 private final Stack<FutureTask<?>> _tasks = new Stack<>();
159 private final Executor _executor;
160
161 ExecutorConcurrency(final Executor executor) {
162 _executor = requireNonNull(executor);
163 }
164
165 @Override
166 public void execute(final Runnable command) {
167 final FutureTask<?> task = new FutureTask<>(command, null);
168 _tasks.push(task);
169 _executor.execute(task);
170 }
171
172 @Override
173 public void execute(final List<? extends Runnable> runnables) {
174 final int[] parts = partition(runnables.size(), CORES + 1);
175 for (int i = 0; i < parts.length - 1; ++i) {
176 execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
177 }
178 }
179
180 @Override
181 public void close() {
182 try {
183 for (FutureTask<?> t = _tasks.pop(); t != null; t = _tasks.pop()) {
184 t.get();
185 }
186 } catch (InterruptedException|ExecutionException e) {
187 throw new CancellationException(e.getMessage());
188 }
189 }
190 }
191
192 /**
193 * This Concurrency executes the runnables within the main thread.
194 */
195 private static final class SerialConcurrency extends Concurrency {
196
197 @Override
198 public void execute(final Runnable command) {
199 command.run();
200 }
201
202 @Override
203 public void execute(final List<? extends Runnable> runnables) {
204 for (final Runnable runnable : runnables) {
205 runnable.run();
206 }
207 }
208
209 @Override
210 public void close() {
211 }
212 }
213
214 }
|