| /* |
| * Copyright 2000-2014 JetBrains s.r.o. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.intellij.concurrency; |
| |
| import com.intellij.openapi.application.ApplicationManager; |
| import com.intellij.openapi.progress.ProcessCanceledException; |
| import com.intellij.openapi.progress.ProgressIndicator; |
| import com.intellij.openapi.progress.ProgressManager; |
| import com.intellij.openapi.progress.util.AbstractProgressIndicatorBase; |
| import com.intellij.util.Consumer; |
| import com.intellij.util.IncorrectOperationException; |
| import com.intellij.util.Processor; |
| import jsr166e.ForkJoinPool; |
| import jsr166e.ForkJoinTask; |
| import jsr166e.ForkJoinWorkerThread; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import java.util.List; |
| import java.util.concurrent.*; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| |
| /** |
| * @author cdr |
| */ |
| public class JobLauncherImpl extends JobLauncher { |
| private static final AtomicLong bits = new AtomicLong(); |
| private static final ForkJoinPool.ForkJoinWorkerThreadFactory FACTORY = new ForkJoinPool.ForkJoinWorkerThreadFactory() { |
| @Override |
| public ForkJoinWorkerThread newThread(ForkJoinPool pool) { |
| final int n = addThread(); |
| ForkJoinWorkerThread thread = new ForkJoinWorkerThread(pool) { |
| @Override |
| protected void onTermination(Throwable exception) { |
| finishThread(n); |
| super.onTermination(exception); |
| } |
| }; |
| thread.setName("JobScheduler FJ pool "+ n +"/"+ JobSchedulerImpl.CORES_COUNT); |
| return thread; |
| } |
| |
| private int addThread() { |
| boolean set; |
| int n; |
| do { |
| long l = bits.longValue(); |
| long next = (l + 1) | l; |
| n = Long.numberOfTrailingZeros(l + 1); |
| set = bits.compareAndSet(l, next); |
| } while (!set); |
| return n; |
| } |
| private void finishThread(int n) { |
| boolean set; |
| do { |
| long l = bits.get(); |
| long next = l & ~(1L << n); |
| set = bits.compareAndSet(l, next); |
| } while (!set); |
| } |
| }; |
| |
| private static final ForkJoinPool pool = new ForkJoinPool(JobSchedulerImpl.CORES_COUNT, FACTORY, null, false); |
| static final int CORES_FORK_THRESHOLD = 1; |
| |
| private static <T> boolean invokeConcurrentlyForAll(@NotNull final List<T> things, |
| boolean runInReadAction, |
| @NotNull final Processor<? super T> thingProcessor, |
| @NotNull ProgressIndicator wrapper) throws ProcessCanceledException { |
| ApplierCompleter applier = new ApplierCompleter(null, runInReadAction, wrapper, things, thingProcessor, 0, things.size(), null); |
| try { |
| pool.invoke(applier); |
| if (applier.throwable != null) throw applier.throwable; |
| } |
| catch (ApplierCompleter.ComputationAbortedException e) { |
| return false; |
| } |
| catch (RuntimeException e) { |
| throw e; |
| } |
| catch (Error e) { |
| throw e; |
| } |
| catch (Throwable e) { |
| throw new RuntimeException(e); |
| } |
| assert applier.isDone(); |
| return applier.completeTaskWhichFailToAcquireReadAction(); |
| } |
| |
| @Override |
| public <T> boolean invokeConcurrentlyUnderProgress(@NotNull List<? extends T>things, |
| ProgressIndicator progress, |
| boolean failFastOnAcquireReadAction, |
| @NotNull final Processor<T> thingProcessor) throws ProcessCanceledException { |
| return invokeConcurrentlyUnderProgress(things, progress, ApplicationManager.getApplication().isReadAccessAllowed(), |
| failFastOnAcquireReadAction, thingProcessor); |
| } |
| |
| @Override |
| public <T> boolean invokeConcurrentlyUnderProgress(@NotNull final List<? extends T> things, |
| ProgressIndicator progress, |
| boolean runInReadAction, |
| boolean failFastOnAcquireReadAction, |
| @NotNull final Processor<T> thingProcessor) throws ProcessCanceledException { |
| if (things.isEmpty()) return true; |
| // supply our own indicator even if we haven't given one - to support cancellation |
| final ProgressIndicator wrapper = progress == null ? new AbstractProgressIndicatorBase() : new SensitiveProgressWrapper(progress); |
| |
| if (things.size() <= 1 || JobSchedulerImpl.CORES_COUNT <= CORES_FORK_THRESHOLD) { |
| final AtomicBoolean result = new AtomicBoolean(true); |
| ProgressManager.getInstance().executeProcessUnderProgress(new Runnable() { |
| @Override |
| public void run() { |
| //noinspection ForLoopReplaceableByForEach |
| for (int i = 0; i < things.size(); i++) { |
| T thing = things.get(i); |
| if (!thingProcessor.process(thing)) { |
| result.set(false); |
| break; |
| } |
| } |
| } |
| }, wrapper); |
| return result.get(); |
| } |
| |
| return invokeConcurrentlyForAll(things, runInReadAction, thingProcessor, wrapper); |
| } |
| |
| // This implementation is not really async |
| @NotNull |
| @Override |
| public <T> AsyncFuture<Boolean> invokeConcurrentlyUnderProgressAsync(@NotNull List<? extends T> things, |
| ProgressIndicator progress, |
| boolean failFastOnAcquireReadAction, |
| @NotNull Processor<T> thingProcessor) { |
| return AsyncUtil.wrapBoolean(invokeConcurrentlyUnderProgress(things, progress, failFastOnAcquireReadAction, thingProcessor)); |
| } |
| |
| @NotNull |
| @Override |
| public Job<Void> submitToJobThread(int priority, @NotNull final Runnable action, final Consumer<Future> onDoneCallback) { |
| VoidForkJoinTask task = new VoidForkJoinTask(action, onDoneCallback); |
| pool.submit(task); |
| return task; |
| } |
| |
| private static class VoidForkJoinTask extends ForkJoinTask<Void> implements Job<Void> { |
| private final Runnable myAction; |
| private final Consumer<Future> myOnDoneCallback; |
| |
| public VoidForkJoinTask(@NotNull Runnable action, @Nullable Consumer<Future> onDoneCallback) { |
| myAction = action; |
| myOnDoneCallback = onDoneCallback; |
| } |
| |
| @Override |
| public Void getRawResult() { |
| return null; |
| } |
| |
| @Override |
| protected void setRawResult(Void value) { |
| } |
| |
| @Override |
| protected boolean exec() { |
| try { |
| myAction.run(); |
| complete(null); // complete manually before calling callback |
| } |
| catch (Throwable throwable) { |
| completeExceptionally(throwable); |
| } |
| finally { |
| if (myOnDoneCallback != null) { |
| myOnDoneCallback.consume(this); |
| } |
| } |
| return true; |
| } |
| |
| //////////////// Job |
| @Override |
| public String getTitle() { |
| throw new IncorrectOperationException(); |
| } |
| |
| @Override |
| public boolean isCanceled() { |
| return isCancelled(); |
| } |
| |
| @Override |
| public void addTask(@NotNull Callable<Void> task) { |
| throw new IncorrectOperationException(); |
| } |
| |
| @Override |
| public void addTask(@NotNull Runnable task, Void result) { |
| throw new IncorrectOperationException(); |
| } |
| |
| @Override |
| public void addTask(@NotNull Runnable task) { |
| throw new IncorrectOperationException(); |
| } |
| |
| @Override |
| public List<Void> scheduleAndWaitForResults() throws Throwable { |
| throw new IncorrectOperationException(); |
| } |
| |
| @Override |
| public void cancel() { |
| cancel(true); |
| } |
| |
| @Override |
| public void schedule() { |
| throw new IncorrectOperationException(); |
| } |
| |
| @Override |
| public void waitForCompletion(int millis) throws InterruptedException, ExecutionException, TimeoutException, CancellationException { |
| get(millis, TimeUnit.MILLISECONDS); |
| } |
| } |
| } |