ItEr58S14RecalculosConexionEscenariosItEr57S15: Make available functionality for making updates asynchronous

This commit is contained in:
Óscar González Fernández 2010-05-19 20:00:03 +02:00
parent 5553599d0d
commit 618196a780
2 changed files with 140 additions and 88 deletions

View file

@ -19,8 +19,13 @@
*/
package org.zkoss.ganttz.util;
import java.util.concurrent.Executor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.Validate;
import org.apache.commons.logging.Log;
@ -100,7 +105,135 @@ public class LongOperationFeedback {
public void doOperation(IDesktopUpdatesEmitter<T> desktopUpdateEmitter);
}
private static final Executor executor = Executors.newCachedThreadPool();
private static final ExecutorService executor = Executors
.newCachedThreadPool();
public static <T> IBackGroundOperation<T> withAsyncUpates(
final IBackGroundOperation<T> backgroundOperation) {
return new IBackGroundOperation<T>() {
@Override
public void doOperation(
IDesktopUpdatesEmitter<T> desktopUpdateEmitter) {
NotBlockingDesktopUpdates<T> notBlockingDesktopUpdates = new NotBlockingDesktopUpdates<T>(
desktopUpdateEmitter);
Future<?> future = executor.submit(notBlockingDesktopUpdates);
try {
backgroundOperation.doOperation(notBlockingDesktopUpdates);
} finally {
notBlockingDesktopUpdates.finish();
}
waitUntilShowingAllUpdates(future);
}
private void waitUntilShowingAllUpdates(Future<?> future) {
try {
future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
private static class NotBlockingDesktopUpdates<T> implements
IDesktopUpdatesEmitter<T>, Runnable {
private BlockingQueue<EndOrValue<T>> queue = new LinkedBlockingQueue<EndOrValue<T>>();
private final IDesktopUpdatesEmitter<T> original;
NotBlockingDesktopUpdates(IDesktopUpdatesEmitter<T> original) {
this.original = original;
}
@Override
public void doUpdate(T value) {
queue.add(EndOrValue.value(value));
}
void finish() {
queue.add(EndOrValue.<T> end());
}
@Override
public void run() {
List<T> batch = new ArrayList<T>();
while (true) {
batch.clear();
EndOrValue<T> current = null;
try {
current = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (current.isEnd()) {
return;
}
batch.add(current.getValue());
while ((current = queue.poll()) != null) {
if (current.isEnd()) {
break;
}
batch.add(current.getValue());
}
if (!batch.isEmpty()) {
for (T each : batch) {
original.doUpdate(each);
}
}
if (current != null && current.isEnd()) {
return;
}
}
}
}
private static abstract class EndOrValue<T> {
public static <T> EndOrValue<T> end() {
return new End<T>();
}
public static <T> EndOrValue<T> value(T value) {
return new Value<T>(value);
}
public abstract boolean isEnd();
public abstract T getValue() throws UnsupportedOperationException;
}
private static class Value<T> extends EndOrValue<T> {
private final T value;
Value(T value) {
Validate.notNull(value);
this.value = value;
}
public T getValue() {
return value;
}
@Override
public boolean isEnd() {
return false;
}
}
private static class End<T> extends EndOrValue<T> {
@Override
public T getValue() throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
@Override
public boolean isEnd() {
return true;
}
}
/**
* Executes a long operation. The background operation can send

View file

@ -27,11 +27,6 @@ import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.Validate;
import org.navalplanner.business.common.IAdHocTransactionService;
@ -101,77 +96,15 @@ public class ReassignCommand implements IReassignCommand {
public void result(final ReassignConfiguration configuration) {
final List<WithAssociatedEntity> reassignations = getReassignations(
context, configuration);
IBackGroundOperation<IDesktopUpdate> reassignationsOperation = LongOperationFeedback
.withAsyncUpates(reassignations(context,
reassignations));
LongOperationFeedback.progressive(getDesktop(context),
reassignations(context, reassignations));
reassignationsOperation);
}
});
}
private class NotBlockingDesktopUpdates implements
IDesktopUpdatesEmitter<IDesktopUpdate>, Runnable {
private BlockingQueue<IDesktopUpdate> queue = new LinkedBlockingQueue<IDesktopUpdate>();
private final IDesktopUpdatesEmitter<IDesktopUpdate> original;
private final IDesktopUpdate END_MARK = new IDesktopUpdate() {
@Override
public void doUpdate() {
}
};
NotBlockingDesktopUpdates(
IDesktopUpdatesEmitter<IDesktopUpdate> original) {
this.original = original;
}
@Override
public void doUpdate(IDesktopUpdate value) {
queue.add(value);
}
void finish() {
queue.add(END_MARK);
}
@Override
public void run() {
List<IDesktopUpdate> batch = new ArrayList<IDesktopUpdate>();
while (true) {
batch.clear();
IDesktopUpdate current = null;
try {
current = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (current == END_MARK) {
return;
}
batch.add(current);
while ((current = queue.poll()) != null) {
if (current == END_MARK) {
break;
}
batch.add(current);
}
if (!batch.isEmpty()) {
original
.doUpdate(asOneUpdate(batch));
}
if (current == END_MARK) {
return;
}
}
}
private IDesktopUpdate asOneUpdate(List<IDesktopUpdate> batch) {
return and(batch.toArray(new IDesktopUpdate[0]));
}
}
private ExecutorService executorService = Executors.newCachedThreadPool();
private IBackGroundOperation<IDesktopUpdate> reassignations(
final IContext<TaskElement> context,
final List<WithAssociatedEntity> reassignations) {
@ -182,21 +115,14 @@ public class ReassignCommand implements IReassignCommand {
final IDesktopUpdatesEmitter<IDesktopUpdate> updater) {
updater.doUpdate(showStart(reassignations.size()));
DeferedNotifier notifications = null;
NotBlockingDesktopUpdates notBlockingDesktopUpdates = new NotBlockingDesktopUpdates(
updater);
Future<?> previousNotifications = executorService
.submit(notBlockingDesktopUpdates);
try {
GanttDiagramGraph ganttDiagramGraph = context.getGanttDiagramGraph();
notifications = ganttDiagramGraph
.manualNotificationOn(doReassignations(
ganttDiagramGraph, reassignations,
notBlockingDesktopUpdates));
ganttDiagramGraph, reassignations, updater));
} finally {
notBlockingDesktopUpdates.finish();
if (notifications != null) {
// null if error
waitUntilFinish(previousNotifications);
updater.doUpdate(and(doNotifications(notifications),
reloadCharts(context), showEnd()));
} else {
@ -205,13 +131,6 @@ public class ReassignCommand implements IReassignCommand {
}
}
private void waitUntilFinish(Future<?> showingProgress){
try {
showingProgress.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}