Reload the snapshots only once the transaction has been completed successfully

FEA: ItEr61S05BugFixing
This commit is contained in:
Óscar González Fernández 2010-10-03 19:35:59 +02:00
parent 2858d29547
commit 9d072fda1f
2 changed files with 110 additions and 12 deletions

View file

@ -19,6 +19,11 @@
*/
package org.navalplanner.business.hibernate.notification;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -27,8 +32,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.Transaction;
import org.hibernate.event.AbstractEvent;
import org.hibernate.event.PostDeleteEvent;
import org.hibernate.event.PostDeleteEventListener;
import org.hibernate.event.PostInsertEvent;
@ -52,23 +62,93 @@ public class HibernateDatabaseModificationsListener implements
private final ConcurrentMap<Class<?>, BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>>> interested;
private ConcurrentMap<Transaction, Dispatcher> pending = new ConcurrentHashMap<Transaction, Dispatcher>();
private Set<NotBlockingAutoUpdatedSnapshot<?>> snapshotsInterestedOn(
Class<?> entityClass) {
List<Class<?>> list = new ArrayList<Class<?>>(1);
list.add(entityClass);
return snapshotsInterestedOn(list);
}
private Set<NotBlockingAutoUpdatedSnapshot<?>> snapshotsInterestedOn(
Collection<? extends Class<?>> classesList) {
Set<NotBlockingAutoUpdatedSnapshot<?>> result = new HashSet<NotBlockingAutoUpdatedSnapshot<?>>();
for (Class<?> each : new HashSet<Class<?>>(classesList)) {
BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>> queue = interested
.get(each);
if (queue != null) {
result.addAll(queue);
}
}
return result;
}
private final class Dispatcher implements Synchronization {
private BlockingQueue<Class<?>> classes = new LinkedBlockingQueue<Class<?>>();
private final Transaction transaction;
public Dispatcher(Transaction transaction, Class<?> entityClass) {
classes.offer(entityClass);
this.transaction = transaction;
}
public void add(Class<?> entityClass) {
classes.offer(entityClass);
}
@Override
public void beforeCompletion() {
}
@Override
public void afterCompletion(int status) {
LOG.info("transaction completed with status: " + status);
pending.remove(transaction);
if (isProbablySucessful(status)) {
List<Class<?>> list = new ArrayList<Class<?>>();
classes.drainTo(list);
LOG.info(list.size() + " modification events recorded");
Set<NotBlockingAutoUpdatedSnapshot<?>> toDispatch = snapshotsInterestedOn(list);
LOG.info("dispatching "
+ toDispatch
+ " snapshots to reload due to transaction successful completion");
dispatch(toDispatch);
}
}
private boolean isProbablySucessful(int status) {
return status != Status.STATUS_ROLLEDBACK
&& status != Status.STATUS_ROLLING_BACK;
}
}
public HibernateDatabaseModificationsListener() {
interested = new ConcurrentHashMap<Class<?>, BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>>>();
}
@Override
public void onPostDelete(PostDeleteEvent event) {
modificationOn(inferEntityClass(getEntityObject(event)));
modificationOn(inferTransaction(event),
inferEntityClass(getEntityObject(event)));
}
@Override
public void onPostUpdate(PostUpdateEvent event) {
modificationOn(inferEntityClass(getEntityObject(event)));
modificationOn(inferTransaction(event),
inferEntityClass(getEntityObject(event)));
}
@Override
public void onPostInsert(PostInsertEvent event) {
modificationOn(inferEntityClass(getEntityObject(event)));
modificationOn(inferTransaction(event),
inferEntityClass(getEntityObject(event)));
}
private Transaction inferTransaction(AbstractEvent event) {
return event.getSession().getTransaction();
}
private Object getEntityObject(PostInsertEvent event) {
@ -91,20 +171,33 @@ public class HibernateDatabaseModificationsListener implements
return entity.getClass();
}
private void modificationOn(Class<?> entityClass) {
LOG.debug("modification on " + entityClass);
BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>> queue = interested
.get(entityClass);
if (queue == null) {
LOG.debug("nobody interested on modification on: " + entityClass);
void modificationOn(Transaction transaction, Class<?> entityClass) {
if (transaction == null) {
dispatch(snapshotsInterestedOn(entityClass));
return;
}
LOG.debug("notifying modification on: " + entityClass + " to " + queue);
for (NotBlockingAutoUpdatedSnapshot<?> each : queue) {
each.reloadNeeded(executor);
Dispatcher newDispatcher = new Dispatcher(transaction, entityClass);
Dispatcher previous = null;
previous = pending.putIfAbsent(transaction, newDispatcher);
boolean dispatcherAlreadyExisted = previous != null;
if (dispatcherAlreadyExisted) {
previous.add(entityClass);
} else {
transaction.registerSynchronization(newDispatcher);
}
}
private void dispatch(Set<NotBlockingAutoUpdatedSnapshot<?>> toBeDispatched) {
for (NotBlockingAutoUpdatedSnapshot<?> each : toBeDispatched) {
dispatch(each);
}
}
private void dispatch(NotBlockingAutoUpdatedSnapshot<?> each) {
each.reloadNeeded(executor);
}
@Override
public <T> IAutoUpdatedSnapshot<T> takeSnapshot(String name,
Callable<T> callable, ReloadOn reloadOn) {

View file

@ -209,6 +209,11 @@ class NotBlockingAutoUpdatedSnapshot<T> implements IAutoUpdatedSnapshot<T> {
}
}
@Override
public String toString() {
return name;
}
private boolean hasBeenInitialized() {
return currentState.get().hasBeenInitialized();
}