Create mechanism for autoupdating snapshots of data

FEA: ItEr60S04ValidacionEProbasFuncionaisItEr59S04
This commit is contained in:
Óscar González Fernández 2010-09-09 01:14:29 +02:00
parent 57a5190303
commit 35bf6c76b2
6 changed files with 454 additions and 0 deletions

View file

@ -0,0 +1,127 @@
/*
* This file is part of NavalPlan
*
* Copyright (C) 2009-2010 Fundación para o Fomento da Calidade Industrial e
* Desenvolvemento Tecnolóxico de Galicia
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.navalplanner.business.hibernate.notification;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hibernate.event.PostDeleteEvent;
import org.hibernate.event.PostDeleteEventListener;
import org.hibernate.event.PostInsertEvent;
import org.hibernate.event.PostInsertEventListener;
import org.hibernate.event.PostUpdateEvent;
import org.hibernate.event.PostUpdateEventListener;
import org.hibernate.proxy.HibernateProxy;
/**
* @author Óscar González Fernández
*
*/
public class HibernateDatabaseModificationsListener implements
PostInsertEventListener, PostUpdateEventListener,
PostDeleteEventListener, ISnapshotRefresherService {
private static final Log LOG = LogFactory
.getLog(HibernateDatabaseModificationsListener.class);
private final ExecutorService executor = Executors.newFixedThreadPool(3);
private final ConcurrentMap<Class<?>, BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>>> interested;
public HibernateDatabaseModificationsListener() {
interested = new ConcurrentHashMap<Class<?>, BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>>>();
}
@Override
public void onPostDelete(PostDeleteEvent event) {
modificationOn(inferEntityClass(getEntityObject(event)));
}
@Override
public void onPostUpdate(PostUpdateEvent event) {
modificationOn(inferEntityClass(getEntityObject(event)));
}
@Override
public void onPostInsert(PostInsertEvent event) {
modificationOn(inferEntityClass(getEntityObject(event)));
}
private Object getEntityObject(PostInsertEvent event) {
return event.getEntity();
}
private static Object getEntityObject(PostDeleteEvent event) {
return event.getEntity();
}
private static Object getEntityObject(PostUpdateEvent event) {
return event.getEntity();
}
private static Class<?> inferEntityClass(Object entity) {
if (entity instanceof HibernateProxy) {
HibernateProxy proxy = (HibernateProxy) entity;
return proxy.getHibernateLazyInitializer().getPersistentClass();
}
return entity.getClass();
}
private void modificationOn(Class<?> entityClass) {
LOG.debug("modification on " + entityClass);
BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>> queue = interested
.get(entityClass);
if (queue == null) {
LOG.debug("nobody interested on modification on: " + entityClass);
return;
}
LOG.debug("notifying modification on: " + entityClass + " to " + queue);
for (NotBlockingAutoUpdatedSnapshot<?> each : queue) {
each.reloadNeeded(executor);
}
}
@Override
public <T> IAutoUpdatedSnapshot<T> takeSnapshot(Callable<T> callable, ReloadOn reloadOn) {
final NotBlockingAutoUpdatedSnapshot<T> result;
result = new NotBlockingAutoUpdatedSnapshot<T>(callable);
for (Class<?> each : reloadOn.getClassesOnWhichToReload()) {
interested.putIfAbsent(each, emptyQueue());
BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>> queue = interested
.get(each);
boolean success = queue.add(result);
assert success : "the type of queue used must not have restricted capacity";
}
result.ensureFirstLoad(executor);
return result;
}
private BlockingQueue<NotBlockingAutoUpdatedSnapshot<?>> emptyQueue() {
return new LinkedBlockingQueue<NotBlockingAutoUpdatedSnapshot<?>>();
}
}

View file

@ -0,0 +1,29 @@
/*
* This file is part of NavalPlan
*
* Copyright (C) 2009-2010 Fundación para o Fomento da Calidade Industrial e
* Desenvolvemento Tecnolóxico de Galicia
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.navalplanner.business.hibernate.notification;
/**
* @author Óscar González Fernández
*
*/
public interface IAutoUpdatedSnapshot<T> {
T getValue();
}

View file

@ -0,0 +1,32 @@
/*
* This file is part of NavalPlan
*
* Copyright (C) 2009-2010 Fundación para o Fomento da Calidade Industrial e
* Desenvolvemento Tecnolóxico de Galicia
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.navalplanner.business.hibernate.notification;
import java.util.concurrent.Callable;
/**
* @author Óscar González Fernández
*
*/
public interface ISnapshotRefresherService {
public <T> IAutoUpdatedSnapshot<T> takeSnapshot(Callable<T> callable, ReloadOn reloadOn);
}

View file

@ -0,0 +1,201 @@
/*
* This file is part of NavalPlan
*
* Copyright (C) 2009-2010 Fundación para o Fomento da Calidade Industrial e
* Desenvolvemento Tecnolóxico de Galicia
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.navalplanner.business.hibernate.notification;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.Validate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author Óscar González Fernández
*
*/
class NotBlockingAutoUpdatedSnapshot<T> implements IAutoUpdatedSnapshot<T> {
private static final Log LOG = LogFactory
.getLog(NotBlockingAutoUpdatedSnapshot.class);
private final Callable<T> callable;
private final AtomicReference<State> currentState;
private abstract class State {
abstract T getValue();
void cancel() {
}
State nextState(Future<T> future) {
return new PreviousValueAndOngoingCalculation(this, future);
}
boolean hasBeenInitialized() {
return true;
}
}
private class NotLaunchState extends State {
@Override
T getValue() {
throw new UnsupportedOperationException();
}
@Override
State nextState(Future<T> future) {
return new FirstCalculation(future);
}
@Override
boolean hasBeenInitialized() {
return false;
}
}
private class NoOngoingCalculation extends State {
private final T value;
NoOngoingCalculation(T value) {
this.value = value;
}
@Override
T getValue() {
return value;
}
}
private class PreviousValueAndOngoingCalculation extends State {
private final State previousValue;
private final Future<T> ongoingCalculation;
private PreviousValueAndOngoingCalculation(State value,
Future<T> ongoingCalculation) {
Validate.notNull(value);
Validate.notNull(ongoingCalculation);
this.previousValue = value;
this.ongoingCalculation = ongoingCalculation;
}
@Override
T getValue() {
if (!ongoingCalculation.isCancelled()
&& ongoingCalculation.isDone()) {
T newValue = getValueFromFuture();
currentState.compareAndSet(this, new NoOngoingCalculation(
newValue));
return newValue;
}
return previousValue.getValue();
}
private T getValueFromFuture() {
try {
return ongoingCalculation.get();
} catch (Exception e) {
LOG.error("error creating new snapshot, keeping old value",
e);
return previousValue.getValue();
}
}
@Override
void cancel() {
try {
ongoingCalculation.cancel(true);
} catch (Exception e) {
LOG.error("error cancelling future", e);
}
}
}
private class FirstCalculation extends State {
private final Future<T> ongoingCalculation;
private FirstCalculation(Future<T> ongoingCalculation) {
this.ongoingCalculation = ongoingCalculation;
}
@Override
T getValue() {
try {
return ongoingCalculation.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
void cancel() {
ongoingCalculation.cancel(true);
}
}
public NotBlockingAutoUpdatedSnapshot(Callable<T> callable) {
Validate.notNull(callable);
this.callable = callable;
currentState = new AtomicReference<State>(new NotLaunchState());
}
@Override
public T getValue() {
return currentState.get().getValue();
}
public void reloadNeeded(ExecutorService executorService) {
Future<T> future = executorService.submit(callable);
State previousState;
State newState = null;
do {
if (newState != null) {
newState.cancel();
}
previousState = currentState.get();
newState = previousState.nextState(future);
} while (!currentState.compareAndSet(previousState, newState));
previousState.cancel();
}
public void ensureFirstLoad(ExecutorService executorService) {
if (hasBeenInitialized()) {
return;
}
Future<T> future = executorService.submit(callable);
State previous = currentState.get();
State newState = previous.nextState(future);
boolean compareAndSet = currentState.compareAndSet(previous, newState);
if (!compareAndSet) {
newState.cancel();
}
}
private boolean hasBeenInitialized() {
return currentState.get().hasBeenInitialized();
}
}

View file

@ -0,0 +1,50 @@
/*
* This file is part of NavalPlan
*
* Copyright (C) 2009-2010 Fundación para o Fomento da Calidade Industrial e
* Desenvolvemento Tecnolóxico de Galicia
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.navalplanner.business.hibernate.notification;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
* @author Óscar González Fernández
*
*/
public class ReloadOn {
public static ReloadOn onChangeOf(Class<?>... klasses) {
return onChangeOf(Arrays.asList(klasses));
}
public static ReloadOn onChangeOf(Collection<Class<?>> klasses) {
return new ReloadOn(klasses);
}
private final List<Class<?>> classes;
private ReloadOn(Collection<? extends Class<?>> classes) {
this.classes = new ArrayList<Class<?>>(classes);
}
public List<Class<?>> getClassesOnWhichToReload() {
return classes;
}
}

View file

@ -19,6 +19,8 @@
<!-- Letting Spring do automatically exception translation -->
<bean class="org.springframework.dao.annotation.PersistenceExceptionTranslationPostProcessor"/>
<bean id="hibernateDatabaseModificationsListener" class="org.navalplanner.business.hibernate.notification.HibernateDatabaseModificationsListener" scope="singleton"/>
<!-- Hibernate Session Factory. -->
<bean id="sessionFactory"
class="org.springframework.orm.hibernate3.LocalSessionFactoryBean"
@ -84,6 +86,19 @@
</value>
</list>
</property>
<property name="eventListeners">
<map>
<entry key="post-insert">
<ref bean="hibernateDatabaseModificationsListener"/>
</entry>
<entry key="post-delete">
<ref bean="hibernateDatabaseModificationsListener"/>
</entry>
<entry key="post-update">
<ref bean="hibernateDatabaseModificationsListener"/>
</entry>
</map>
</property>
</bean>
<!-- Spring Transaction manager -->