View Javadoc

1   /* This file is part of COPAL (COntext Provisioning for All).
2    *
3    * COPAL is a part of SM4All (Smart hoMes for All) project.
4    *
5    * COPAL is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * COPAL is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public License
16   * along with COPAL. If not, see <http://www.gnu.org/licenses/>.
17   */
18  package at.ac.tuwien.infosys.sm4all.copal.esper.internal;
19  
20  import java.text.MessageFormat;
21  import java.util.HashMap;
22  import java.util.Map;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.locks.ReadWriteLock;
25  import java.util.concurrent.locks.ReentrantReadWriteLock;
26  import javax.xml.namespace.QName;
27  import javax.xml.xpath.XPathConstants;
28  import org.apache.log4j.Level;
29  import org.apache.log4j.Logger;
30  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEvent;
31  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEventType;
32  import at.ac.tuwien.infosys.sm4all.copal.api.event.XMLContextEvent;
33  import at.ac.tuwien.infosys.sm4all.copal.api.event.XMLContextEvent.Property;
34  import at.ac.tuwien.infosys.sm4all.copal.api.event.XMLContextEventType;
35  import at.ac.tuwien.infosys.sm4all.copal.api.osgi.GenericActivator;
36  import at.ac.tuwien.infosys.sm4all.copal.api.service.ContextEventTypeRegistry;
37  import at.ac.tuwien.infosys.sm4all.copal.api.service.ContextQueryFactory;
38  import at.ac.tuwien.infosys.sm4all.copal.api.service.FailedPublishingException;
39  import at.ac.tuwien.infosys.sm4all.copal.api.service.PublishingService;
40  import at.ac.tuwien.infosys.sm4all.copal.api.util.ConcreteObservable;
41  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable;
42  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistryObserver;
43  import at.ac.tuwien.infosys.sm4all.copal.api.xml.Constants;
44  import com.espertech.esper.client.ConfigurationEventTypeXMLDOM;
45  import com.espertech.esper.client.ConfigurationException;
46  import com.espertech.esper.client.ConfigurationOperations;
47  import com.espertech.esper.client.EPRuntime;
48  import com.espertech.esper.client.EPServiceProvider;
49  import com.espertech.esper.client.EventSender;
50  
51  /**
52   * Implementation of the {@link PublishingService} interface which uses Esper as
53   * actual event system.
54   * 
55   * @author fei
56   * @author sanjin
57   */
58  public class EsperPublishing extends GenericActivator implements
59          PublishingService, RegistryObserver<ContextEventType>,
60          RegistryObservable<XMLContextEventType> {
61  
62      private static final Logger LOGGER = Logger.getLogger(EsperPublishing.class);
63  
64      private final ConcreteObservable<RegistrationState, XMLContextEventType, RegistryObserver<XMLContextEventType>> observable = new ConcreteObservable<RegistrationState, XMLContextEventType, RegistryObserver<XMLContextEventType>>();
65      private final AtomicBoolean started = new AtomicBoolean(false);
66      private final Map<String, XMLContextEventType> eventTypes = new HashMap<String, XMLContextEventType>();
67      private final Map<String, EventSender> senders = new HashMap<String, EventSender>();
68      private final ReadWriteLock lock = new ReentrantReadWriteLock();
69      private final ConfigurationOperations configuration;
70      private final EPRuntime runtime;
71      private ContextEventTypeRegistry eventTypeRegistry;
72  
73      /**
74       * Creates an instance of the {@link PublishingService} implementation which
75       * uses specified {@link EPServiceProvider}.
76       * 
77       * @param serviceProvider the {@link EPServiceProvider}.
78       */
79      public EsperPublishing(final EPServiceProvider serviceProvider) {
80          super(ContextEventTypeRegistry.class.getName(),
81                  ContextQueryFactory.class.getName());
82  
83          this.configuration = serviceProvider.getEPAdministrator().getConfiguration();
84          this.runtime = serviceProvider.getEPRuntime();
85      }
86  
87      /**
88       * Returns if this {@link PublishingService} has been started by an OSGi
89       * framework.
90       * 
91       * @return if this {@link PublishingService} has been started by an OSGi
92       *         framework.
93       */
94      public boolean isStarted() {
95          return this.started.get();
96      }
97  
98      @Override
99      public void attach(final RegistryObserver<XMLContextEventType> observer) {
100         this.lock.readLock().lock();
101         try {
102             this.observable.attach(observer);
103             for (final XMLContextEventType eventType : this.eventTypes.values()) {
104                 observer.update(RegistrationState.Registered, eventType);
105             }
106         } finally {
107             this.lock.readLock().unlock();
108         }
109     }
110 
111     @Override
112     public void detach(final RegistryObserver<XMLContextEventType> observer) {
113         this.lock.readLock().lock();
114         try {
115             this.observable.detach(observer);
116             for (final XMLContextEventType eventType : this.eventTypes.values()) {
117                 observer.update(RegistrationState.Unregistered, eventType);
118             }
119         } finally {
120             this.lock.readLock().unlock();
121         }
122     }
123 
124     @Override
125     protected void start() {
126         this.eventTypeRegistry = getDependency(ContextEventTypeRegistry.class.getName());
127         this.eventTypeRegistry.attach(this);
128 
129         if (!this.started.getAndSet(true)) {
130             if (register(PublishingService.class, this)) {
131                 if (LOGGER.isInfoEnabled()) {
132                     LOGGER.info("Successfully registered PublishingService.");
133                 }
134             } else {
135                 if (LOGGER.isEnabledFor(Level.ERROR)) {
136                     LOGGER.error("Failed to register PublishingService!");
137                 }
138             }
139         }
140     }
141 
142     @Override
143     protected void stop() {
144         this.eventTypeRegistry.detach(this);
145         this.eventTypeRegistry = null;
146 
147         if (this.started.getAndSet(false)) {
148             if (unregister(PublishingService.class)) {
149                 if (LOGGER.isInfoEnabled()) {
150                     LOGGER.info("Successfully unregistered PublishingService.");
151                 }
152             }
153 
154             unregisterAll();
155         }
156     }
157 
158     @Override
159     public void publish(final ContextEvent... events)
160             throws FailedPublishingException {
161         this.lock.readLock().lock();
162         try {
163             validate(events);
164             for (final ContextEvent event : events) {
165                 event.nextAction();
166                 this.senders.get(event.getType().getName()).sendEvent(
167                         ((XMLContextEvent) event).getDocument());
168             }
169         } finally {
170             this.lock.readLock().unlock();
171         }
172     }
173 
174     @Override
175     public void update(final RegistrationState state,
176             final ContextEventType eventType) {
177         if (eventType instanceof XMLContextEventType) {
178             switch (state) {
179             case Registered:
180                 register((XMLContextEventType) eventType);
181                 break;
182             case Unregistered:
183                 unregister((XMLContextEventType) eventType);
184                 break;
185             default:
186                 break;
187             }
188         }
189     }
190 
191     private void validate(final ContextEvent... events)
192             throws FailedPublishingException {
193         this.lock.readLock().lock();
194         try {
195             for (final ContextEvent event : events) {
196                 final String eventType = event.getType().getName();
197                 if (event.getCurrentActionIndex() >= event.getNumberOfActions()) {
198                     if (LOGGER.isEnabledFor(Level.ERROR)) {
199                         LOGGER.error(MessageFormat.format(
200                                 "Failed to publish event ''{0}''! Event is already processed.",
201                                 eventType));
202                     }
203                     throw new FailedPublishingException(
204                             "Event is already processed.", event);
205                 }
206                 if (!(event instanceof XMLContextEvent)) {
207                     if (LOGGER.isEnabledFor(Level.ERROR)) {
208                         LOGGER.error(MessageFormat.format(
209                                 "Failed to publish event ''{0}''! Only XML events are supported.",
210                                 eventType));
211                     }
212                     throw new FailedPublishingException(
213                             "Only XML events are supported.", event);
214                 }
215                 if (!this.senders.containsKey(eventType)) {
216                     if (LOGGER.isEnabledFor(Level.ERROR)) {
217                         LOGGER.error(MessageFormat.format(
218                                 "Failed to publish event ''{0}''! Unknown type.",
219                                 eventType));
220                     }
221                     throw new FailedPublishingException("Unknown type.", event);
222                 }
223             }
224         } finally {
225             this.lock.readLock().unlock();
226         }
227     }
228 
229     private void register(final XMLContextEventType eventType) {
230         final String eventName = eventType.getName();
231 
232         this.lock.writeLock().lock();
233         try {
234             if (this.senders.containsKey(eventName)) {
235                 if (LOGGER.isEnabledFor(Level.ERROR)) {
236                     LOGGER.error(MessageFormat.format(
237                             "Failed to register event type ''{0}''! Already registered.",
238                             eventName));
239                 }
240             } else {
241                 final ConfigurationEventTypeXMLDOM config = new ConfigurationEventTypeXMLDOM();
242                 // makes properties namespace aware
243                 config.setXPathPropertyExpr(true);
244                 config.setRootElementName(eventType.getRootElementName());
245                 config.addXPathProperty("*", "/", XPathConstants.NODE);
246                 config.addNamespacePrefix(Constants.COPAL_PREFIX,
247                         Constants.COPAL_NAMESPACE);
248                 for (final Property property : XMLContextEvent.Property.values()) {
249                     final QName xmlType;
250                     final Class<?> type = property.getType();
251                     if (type.isArray()) {
252                         xmlType = XPathConstants.NODESET;
253                     } else {
254                         xmlType = XPathConstants.STRING;
255                     }
256                     config.addXPathProperty(property.toString(),
257                             property.getXPath(), xmlType, type.getSimpleName());
258                 }
259                 if (eventType.hasNamespace()) {
260                     final String eventNamespaceURI = eventType.getNamespaceURI().toString();
261                     config.setDefaultNamespace(eventNamespaceURI);
262                     config.setRootElementNamespace(eventNamespaceURI);
263                 }
264                 if (eventType.hasSchema()) {
265                     config.setSchemaResource(eventType.getSchemaURL().toString());
266                 }
267                 this.configuration.addEventType(eventName, config);
268 
269                 this.eventTypes.put(eventName, eventType);
270                 this.senders.put(eventName,
271                         this.runtime.getEventSender(eventName));
272 
273                 this.observable.notifyAll(RegistrationState.Registered,
274                         eventType);
275                 if (LOGGER.isInfoEnabled()) {
276                     LOGGER.info(MessageFormat.format(
277                             "Successfully registered event type ''{0}''.",
278                             eventName));
279                 }
280             }
281         } finally {
282             this.lock.writeLock().unlock();
283         }
284     }
285 
286     private void unregister(final XMLContextEventType eventType) {
287         final String eventName = eventType.getName();
288 
289         this.observable.notifyAll(RegistrationState.Unregistered, eventType);
290 
291         this.lock.writeLock().lock();
292         try {
293             try {
294                 if (this.senders.containsKey(eventName)) {
295                     if (this.configuration.removeEventType(eventName, false)) {
296                         if (this.started.get()) {
297                             this.eventTypes.remove(eventName);
298                             this.senders.remove(eventName);
299                         }
300 
301                         if (LOGGER.isInfoEnabled()) {
302                             LOGGER.info(MessageFormat.format(
303                                     "Successfully unregistered event type ''{0}''.",
304                                     eventName));
305                         }
306                     } else {
307                         if (LOGGER.isEnabledFor(Level.ERROR)) {
308                             LOGGER.error(MessageFormat.format(
309                                     "Failed to unregister event type ''{0}''! Removal from Esper configuration failed.",
310                                     eventName));
311                         }
312                     }
313                 } else {
314                     if (LOGGER.isEnabledFor(Level.ERROR)) {
315                         LOGGER.error(MessageFormat.format(
316                                 "Failed to unregister event type ''{0}''! Event is not registered.",
317                                 eventName));
318                     }
319                 }
320             } catch (final ConfigurationException ex) {
321                 if (LOGGER.isEnabledFor(Level.ERROR)) {
322                     LOGGER.error(MessageFormat.format(
323                             "Failed to unregister event type ''{0}''!",
324                             eventName), ex);
325                 }
326             }
327         } finally {
328             this.lock.writeLock().unlock();
329         }
330     }
331 
332     private void unregisterAll() {
333         if (LOGGER.isDebugEnabled()) {
334             LOGGER.debug("Unregistering all event types.");
335         }
336 
337         this.lock.writeLock().lock();
338         try {
339             for (final XMLContextEventType eventType : this.eventTypes.values()) {
340                 unregister(eventType);
341             }
342 
343             this.senders.clear();
344         } finally {
345             this.lock.writeLock().unlock();
346         }
347 
348         if (LOGGER.isInfoEnabled()) {
349             LOGGER.info("All event types unregistered!");
350         }
351     }
352 }