Concurrency.java
001 /*
002  * Java Genetic Algorithm Library (jenetics-2.0.2).
003  * Copyright (c) 2007-2014 Franz Wilhelmstötter
004  *
005  * Licensed under the Apache License, Version 2.0 (the "License");
006  * you may not use this file except in compliance with the License.
007  * You may obtain a copy of the License at
008  *
009  *      http://www.apache.org/licenses/LICENSE-2.0
010  *
011  * Unless required by applicable law or agreed to in writing, software
012  * distributed under the License is distributed on an "AS IS" BASIS,
013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014  * See the License for the specific language governing permissions and
015  * limitations under the License.
016  *
017  * Author:
018  *    Franz Wilhelmstötter (franz.wilhelmstoetter@gmx.at)
019  */
020 package org.jenetics.internal.util;
021 
022 import static java.lang.Math.max;
023 import static java.util.Objects.requireNonNull;
024 import static org.jenetics.util.arrays.partition;
025 
026 import java.util.List;
027 import java.util.concurrent.CancellationException;
028 import java.util.concurrent.ExecutionException;
029 import java.util.concurrent.Executor;
030 import java.util.concurrent.ExecutorService;
031 import java.util.concurrent.ForkJoinPool;
032 import java.util.concurrent.ForkJoinTask;
033 import java.util.concurrent.Future;
034 import java.util.concurrent.FutureTask;
035 
036 /**
037  @author <a href="mailto:franz.wilhelmstoetter@gmx.at">Franz Wilhelmstötter</a>
038  @version 2.0 &mdash; <em>$Date: 2014-04-05 $</em>
039  @since 2.0
040  */
041 public abstract class Concurrency implements Executor, AutoCloseable {
042 
043     public static final int CORES = Runtime.getRuntime().availableProcessors();
044 
045     public static final Concurrency SERIAL_EXECUTOR = new SerialConcurrency();
046 
047     private static final class LazyPoolHolder {
048         public static final ForkJoinPool FORK_JOIN_POOL =
049             new ForkJoinPool(max(CORES - 11));
050     }
051 
052     public static ForkJoinPool commonPool() {
053         return LazyPoolHolder.FORK_JOIN_POOL;
054     }
055 
056     public abstract void execute(final List<? extends Runnable> runnables);
057 
058     @Override
059     public abstract void close();
060 
061     /**
062      * Return an new Concurrency object from the given executor.
063      *
064      @param executor the underlying Executor
065      @return a new Concurrency object
066      */
067     public static Concurrency with(final Executor executor) {
068         if (executor instanceof ForkJoinPool) {
069             return new ForkJoinPoolConcurrency((ForkJoinPool)executor);
070         else if (executor instanceof ExecutorService) {
071             return new ExecutorServiceConcurrency((ExecutorService)executor);
072         else if (executor == SERIAL_EXECUTOR) {
073             return SERIAL_EXECUTOR;
074         else {
075             return new ExecutorConcurrency(executor);
076         }
077     }
078 
079     /**
080      * Return a new Concurrency object using the common ForkJoinPool.
081      *
082      @return a new Concurrency object using the new ForkJoinPool
083      */
084     public static Concurrency withCommonPool() {
085         return with(commonPool());
086     }
087 
088 
089     /**
090      * This Concurrency uses a ForkJoinPool.
091      */
092     private static final class ForkJoinPoolConcurrency extends Concurrency {
093         private final Stack<ForkJoinTask<?>> _tasks = new Stack<>();
094         private final ForkJoinPool _pool;
095 
096         ForkJoinPoolConcurrency(final ForkJoinPool pool) {
097             _pool = requireNonNull(pool);
098         }
099 
100         @Override
101         public void execute(final Runnable runnable) {
102             _tasks.push(_pool.submit(runnable));
103         }
104 
105         @Override
106         public void execute(final List<? extends Runnable> runnables) {
107             _tasks.push(_pool.submit(new RunnablesAction(runnables)));
108         }
109 
110         @Override
111         public void close() {
112             for (ForkJoinTask<?> t = _tasks.pop(); t != null; t = _tasks.pop()) {
113                 t.join();
114             }
115         }
116     }
117 
118     /**
119      * This Concurrency uses an ExecutorService.
120      */
121     private static final class ExecutorServiceConcurrency extends Concurrency {
122         private final Stack<Future<?>> _futures = new Stack<>();
123         private final ExecutorService _service;
124 
125         ExecutorServiceConcurrency(final ExecutorService service) {
126             _service = requireNonNull(service);
127         }
128 
129         @Override
130         public void execute(final Runnable command) {
131             _futures.push(_service.submit(command));
132         }
133 
134         @Override
135         public void execute(final List<? extends Runnable> runnables) {
136             final int[] parts = partition(runnables.size(), CORES + 1);
137             for (int i = 0; i < parts.length - 1; ++i) {
138                 execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
139             }
140         }
141 
142         @Override
143         public void close() {
144             try {
145                 for (Future<?> f = _futures.pop(); f != null; f = _futures.pop()) {
146                     f.get();
147                 }
148             catch (InterruptedException|ExecutionException e) {
149                 throw new CancellationException(e.getMessage());
150             }
151         }
152     }
153 
154     /**
155      * This Concurrency uses an Executor.
156      */
157     private static final class ExecutorConcurrency extends Concurrency {
158         private final Stack<FutureTask<?>> _tasks = new Stack<>();
159         private final Executor _executor;
160 
161         ExecutorConcurrency(final Executor executor) {
162             _executor = requireNonNull(executor);
163         }
164 
165         @Override
166         public void execute(final Runnable command) {
167             final FutureTask<?> task = new FutureTask<>(command, null);
168             _tasks.push(task);
169             _executor.execute(task);
170         }
171 
172         @Override
173         public void execute(final List<? extends Runnable> runnables) {
174             final int[] parts = partition(runnables.size(), CORES + 1);
175             for (int i = 0; i < parts.length - 1; ++i) {
176                 execute(new RunnablesRunnable(runnables, parts[i], parts[i + 1]));
177             }
178         }
179 
180         @Override
181         public void close() {
182             try {
183                 for (FutureTask<?> t = _tasks.pop(); t != null; t = _tasks.pop()) {
184                     t.get();
185                 }
186             catch (InterruptedException|ExecutionException e) {
187                 throw new CancellationException(e.getMessage());
188             }
189         }
190     }
191 
192     /**
193      * This Concurrency executes the runnables within the main thread.
194      */
195     private static final class SerialConcurrency extends Concurrency {
196 
197         @Override
198         public void execute(final Runnable command) {
199             command.run();
200         }
201 
202         @Override
203         public void execute(final List<? extends Runnable> runnables) {
204             for (final Runnable runnable : runnables) {
205                 runnable.run();
206             }
207         }
208 
209         @Override
210         public void close() {
211         }
212     }
213 
214 }