diff --git a/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/HibernateDatabaseModificationsListener.java b/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/HibernateDatabaseModificationsListener.java index 175a600c6..f59c050b2 100644 --- a/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/HibernateDatabaseModificationsListener.java +++ b/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/HibernateDatabaseModificationsListener.java @@ -19,6 +19,11 @@ */ package org.navalplanner.business.hibernate.notification; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; @@ -27,8 +32,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import javax.transaction.Status; +import javax.transaction.Synchronization; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.hibernate.Transaction; +import org.hibernate.event.AbstractEvent; import org.hibernate.event.PostDeleteEvent; import org.hibernate.event.PostDeleteEventListener; import org.hibernate.event.PostInsertEvent; @@ -52,23 +62,93 @@ public class HibernateDatabaseModificationsListener implements private final ConcurrentMap, BlockingQueue>> interested; + private ConcurrentMap pending = new ConcurrentHashMap(); + + private Set> snapshotsInterestedOn( + Class entityClass) { + List> list = new ArrayList>(1); + list.add(entityClass); + return snapshotsInterestedOn(list); + } + + private Set> snapshotsInterestedOn( + Collection> classesList) { + Set> result = new HashSet>(); + for (Class each : new HashSet>(classesList)) { + BlockingQueue> queue = interested + .get(each); + if (queue != null) { + result.addAll(queue); + } + } + return result; + } + + private final class Dispatcher implements Synchronization { + + private BlockingQueue> classes = new LinkedBlockingQueue>(); + private final Transaction transaction; + + public Dispatcher(Transaction transaction, Class entityClass) { + classes.offer(entityClass); + this.transaction = transaction; + } + + public void add(Class entityClass) { + classes.offer(entityClass); + } + + @Override + public void beforeCompletion() { + } + + @Override + public void afterCompletion(int status) { + LOG.info("transaction completed with status: " + status); + pending.remove(transaction); + if (isProbablySucessful(status)) { + List> list = new ArrayList>(); + classes.drainTo(list); + LOG.info(list.size() + " modification events recorded"); + Set> toDispatch = snapshotsInterestedOn(list); + LOG.info("dispatching " + + toDispatch + + " snapshots to reload due to transaction successful completion"); + dispatch(toDispatch); + } + } + + private boolean isProbablySucessful(int status) { + return status != Status.STATUS_ROLLEDBACK + && status != Status.STATUS_ROLLING_BACK; + } + + } + public HibernateDatabaseModificationsListener() { interested = new ConcurrentHashMap, BlockingQueue>>(); } @Override public void onPostDelete(PostDeleteEvent event) { - modificationOn(inferEntityClass(getEntityObject(event))); + modificationOn(inferTransaction(event), + inferEntityClass(getEntityObject(event))); } @Override public void onPostUpdate(PostUpdateEvent event) { - modificationOn(inferEntityClass(getEntityObject(event))); + modificationOn(inferTransaction(event), + inferEntityClass(getEntityObject(event))); } @Override public void onPostInsert(PostInsertEvent event) { - modificationOn(inferEntityClass(getEntityObject(event))); + modificationOn(inferTransaction(event), + inferEntityClass(getEntityObject(event))); + } + + private Transaction inferTransaction(AbstractEvent event) { + return event.getSession().getTransaction(); } private Object getEntityObject(PostInsertEvent event) { @@ -91,20 +171,33 @@ public class HibernateDatabaseModificationsListener implements return entity.getClass(); } - private void modificationOn(Class entityClass) { - LOG.debug("modification on " + entityClass); - BlockingQueue> queue = interested - .get(entityClass); - if (queue == null) { - LOG.debug("nobody interested on modification on: " + entityClass); + void modificationOn(Transaction transaction, Class entityClass) { + if (transaction == null) { + dispatch(snapshotsInterestedOn(entityClass)); return; } - LOG.debug("notifying modification on: " + entityClass + " to " + queue); - for (NotBlockingAutoUpdatedSnapshot each : queue) { - each.reloadNeeded(executor); + Dispatcher newDispatcher = new Dispatcher(transaction, entityClass); + Dispatcher previous = null; + previous = pending.putIfAbsent(transaction, newDispatcher); + + boolean dispatcherAlreadyExisted = previous != null; + if (dispatcherAlreadyExisted) { + previous.add(entityClass); + } else { + transaction.registerSynchronization(newDispatcher); } } + private void dispatch(Set> toBeDispatched) { + for (NotBlockingAutoUpdatedSnapshot each : toBeDispatched) { + dispatch(each); + } + } + + private void dispatch(NotBlockingAutoUpdatedSnapshot each) { + each.reloadNeeded(executor); + } + @Override public IAutoUpdatedSnapshot takeSnapshot(String name, Callable callable, ReloadOn reloadOn) { diff --git a/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/NotBlockingAutoUpdatedSnapshot.java b/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/NotBlockingAutoUpdatedSnapshot.java index ce246c755..c5fc8a1df 100644 --- a/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/NotBlockingAutoUpdatedSnapshot.java +++ b/navalplanner-business/src/main/java/org/navalplanner/business/hibernate/notification/NotBlockingAutoUpdatedSnapshot.java @@ -209,6 +209,11 @@ class NotBlockingAutoUpdatedSnapshot implements IAutoUpdatedSnapshot { } } + @Override + public String toString() { + return name; + } + private boolean hasBeenInitialized() { return currentState.get().hasBeenInitialized(); }