View Javadoc

1   /* This file is part of COPAL (COntext Provisioning for All).
2    *
3    * COPAL is a part of SM4All (Smart hoMes for All) project.
4    *
5    * COPAL is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * COPAL is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public License
16   * along with COPAL. If not, see <http://www.gnu.org/licenses/>.
17   */
18  package at.ac.tuwien.infosys.sm4all.copal.esper.internal;
19  
20  import java.text.MessageFormat;
21  import java.util.HashMap;
22  import java.util.Map;
23  import java.util.concurrent.locks.ReadWriteLock;
24  import java.util.concurrent.locks.ReentrantReadWriteLock;
25  import javax.xml.namespace.QName;
26  import javax.xml.xpath.XPathConstants;
27  import org.apache.log4j.Logger;
28  import org.w3c.dom.Document;
29  import at.ac.tuwien.infosys.sm4all.copal.api.ContextException;
30  import at.ac.tuwien.infosys.sm4all.copal.api.ContextListener;
31  import at.ac.tuwien.infosys.sm4all.copal.api.ContextQuery;
32  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEvent;
33  import at.ac.tuwien.infosys.sm4all.copal.api.event.xml.Constants;
34  import at.ac.tuwien.infosys.sm4all.copal.api.event.xml.XMLContextEvent;
35  import at.ac.tuwien.infosys.sm4all.copal.api.event.xml.XMLContextEventType;
36  import at.ac.tuwien.infosys.sm4all.copal.api.event.xml.XMLElement;
37  import at.ac.tuwien.infosys.sm4all.copal.esper.EventNotifier;
38  import at.ac.tuwien.infosys.sm4all.copal.service.event.ContextQueryRegistry;
39  import at.ac.tuwien.infosys.sm4all.copal.service.event.EventSystem;
40  import at.ac.tuwien.infosys.sm4all.copal.service.event.EventTypeRegistry;
41  import at.ac.tuwien.infosys.sm4all.copal.service.event.PublishingService;
42  import com.espertech.esper.client.Configuration;
43  import com.espertech.esper.client.ConfigurationEventTypeXMLDOM;
44  import com.espertech.esper.client.ConfigurationException;
45  import com.espertech.esper.client.ConfigurationOperations;
46  import com.espertech.esper.client.EPAdministrator;
47  import com.espertech.esper.client.EPException;
48  import com.espertech.esper.client.EPRuntime;
49  import com.espertech.esper.client.EPServiceProvider;
50  import com.espertech.esper.client.EPServiceProviderManager;
51  import com.espertech.esper.client.EventSender;
52  
53  /**
54   * Implementation of the {@link EventTypeRegistry}, {@link PublishingService},
55   * and {@link ContextQueryRegistry} interfaces which uses Esper as actual event
56   * system. These interfaces are used by COPAL core services.
57   * 
58   * @author fei
59   * @author sanjin
60   */
61  public class EsperEngine implements PublishingService, EventSystem,
62          EventTypeRegistry, ContextQueryRegistry {
63  
64      private static final Logger LOGGER = Logger.getLogger(EsperEngine.class);
65  
66      private final Map<String, EventNotifier> notifiers = new HashMap<String, EventNotifier>();
67      private final Map<String, EventSender> events = new HashMap<String, EventSender>();
68      private final ReadWriteLock notifiersLock = new ReentrantReadWriteLock();
69      private final ReadWriteLock eventsLock = new ReentrantReadWriteLock();
70      private final EPServiceProvider serviceProvider;
71      private final EPAdministrator administrator;
72      private final ConfigurationOperations configuration;
73      private final EPRuntime runtime;
74      private boolean shutdown = false;
75  
76      /**
77       * Creates instance that uses Esper provider with <code>EsperService</code>
78       * name.
79       */
80      public EsperEngine() {
81          super();
82  
83          final Configuration config = new Configuration();
84          // turn off view sharing
85          config.getEngineDefaults().getViewResources().setShareViews(false);
86          // turn off delivery order guarantees
87          config.getEngineDefaults().getThreading().setListenerDispatchPreserveOrder(
88                  false);
89          config.getEngineDefaults().getThreading().setInsertIntoDispatchPreserveOrder(
90                  false);
91  
92          this.serviceProvider = EPServiceProviderManager.getDefaultProvider(config);
93          this.serviceProvider.initialize();
94          this.administrator = this.serviceProvider.getEPAdministrator();
95          this.configuration = this.administrator.getConfiguration();
96          this.runtime = this.serviceProvider.getEPRuntime();
97  
98          LOGGER.info("Esper engine started!");
99      }
100 
101     @Override
102     public void publish(final ContextEvent event) throws ContextException {
103         if (event instanceof XMLContextEvent) {
104             final Document document = ((XMLContextEvent) event).getDocument();
105             this.eventsLock.readLock().lock();
106             try {
107                 this.events.get(event.getType().getName()).sendEvent(document);
108             } finally {
109                 this.eventsLock.readLock().unlock();
110             }
111         } else
112             throw new ContextException(
113                     "Only publishing of XML context events is supported!");
114     }
115 
116     @Override
117     public void setUnmatchedEventListener(final ContextListener listener) {
118         this.runtime.setUnmatchedListener(new UnmatchedEventListener(listener));
119     }
120 
121     @Override
122     public boolean register(final ContextQuery query) {
123         final String queryName = query.getName();
124         boolean result = false;
125 
126         this.notifiersLock.writeLock().lock();
127         try {
128             if (this.notifiers.containsKey(queryName))
129                 LOGGER.error(MessageFormat.format(
130                         "Failed to register query ''{0}''! Query already registered.",
131                         queryName));
132             else {
133                 final EventNotifier notifier = new EventNotifier(
134                         this.administrator, query);
135                 this.notifiers.put(queryName, notifier);
136 
137                 if (LOGGER.isInfoEnabled())
138                     LOGGER.info(MessageFormat.format(
139                             "Successfully registered query ''{0}''.", queryName));
140                 result = true;
141             }
142         } catch (final EPException ex) {
143             LOGGER.error(
144                     MessageFormat.format(
145                             "Failed to register query ''{0}''! Could not create Esper statement.",
146                             queryName), ex);
147         } finally {
148             this.notifiersLock.writeLock().unlock();
149         }
150 
151         return result;
152     }
153 
154     @Override
155     public boolean unregister(final ContextQuery query) {
156         final String queryName = query.getName();
157         boolean result = false;
158 
159         this.notifiersLock.writeLock().lock();
160         try {
161             if (this.notifiers.containsKey(queryName)) {
162                 final EventNotifier notifier = this.notifiers.get(queryName);
163 
164                 notifier.destroy();
165                 if (!this.shutdown)
166                     this.notifiers.remove(queryName);
167 
168                 if (LOGGER.isInfoEnabled())
169                     LOGGER.info(MessageFormat.format(
170                             "Successfully unregistered query ''{0}''.",
171                             queryName));
172                 result = true;
173             } else
174                 LOGGER.error(MessageFormat.format(
175                         "Failed to unregister query ''{0}''! Query is not registered.",
176                         queryName));
177         } finally {
178             this.notifiersLock.writeLock().unlock();
179         }
180 
181         return result;
182     }
183 
184     @Override
185     public boolean register(final XMLContextEventType eventType) {
186         final String eventName = eventType.getName();
187         boolean result = false;
188 
189         this.eventsLock.writeLock().lock();
190         try {
191             if (this.events.containsKey(eventName))
192                 LOGGER.error(MessageFormat.format(
193                         "Failed to register event ''{0}''! Already registered.",
194                         eventName));
195             else {
196                 final ConfigurationEventTypeXMLDOM config = new ConfigurationEventTypeXMLDOM();
197                 final XMLContextEventType xmlEventType = eventType;
198                 // makes properties namespace aware
199                 config.setXPathPropertyExpr(true);
200                 config.setRootElementName(xmlEventType.getRootElementName());
201                 config.addXPathProperty("*", "/", XPathConstants.NODE);
202                 config.addNamespacePrefix(Constants.COPAL_PREFIX,
203                         Constants.COPAL_NAMESPACE_URI);
204                 for (final XMLElement<?> element : XMLContextEvent.XML_ELEMENTS) {
205                     final QName xmlType;
206                     final Class<?> type = element.getXMLType();
207                     if (type.isArray())
208                         xmlType = XPathConstants.NODESET;
209                     else
210                         xmlType = XPathConstants.STRING;
211                     config.addXPathProperty(element.getQualifiedName(),
212                             element.getXPath(), xmlType, type.getSimpleName());
213                 }
214                 if (xmlEventType.hasNamespace()) {
215                     final String eventNamespaceURI = xmlEventType.getNamespaceURI().toString();
216                     config.setDefaultNamespace(eventNamespaceURI);
217                     config.setRootElementNamespace(eventNamespaceURI);
218                 }
219                 if (xmlEventType.hasSchema())
220                     config.setSchemaResource(xmlEventType.getSchemaURL().toString());
221                 this.configuration.addEventType(eventName, config);
222 
223                 this.events.put(eventName,
224                         this.runtime.getEventSender(eventName));
225                 if (LOGGER.isInfoEnabled())
226                     LOGGER.info(MessageFormat.format(
227                             "Successfully registered event ''{0}''.", eventName));
228                 result = true;
229             }
230         } finally {
231             this.eventsLock.writeLock().unlock();
232         }
233 
234         return result;
235     }
236 
237     @Override
238     public boolean unregister(final String eventName) {
239         boolean result = false;
240 
241         this.eventsLock.writeLock().lock();
242         try {
243             try {
244                 if (this.events.containsKey(eventName)) {
245                     result = this.configuration.removeEventType(eventName,
246                             false);
247 
248                     if (result) {
249                         if (!this.shutdown)
250                             this.events.remove(eventName);
251 
252                         if (LOGGER.isInfoEnabled())
253                             LOGGER.info(MessageFormat.format(
254                                     "Successfully unregistered event ''{0}''.",
255                                     eventName));
256                     } else
257                         LOGGER.error(MessageFormat.format(
258                                 "Failed to unregister event ''{0}''! Removal from Esper configuration failed.",
259                                 eventName));
260                 } else
261                     LOGGER.error(MessageFormat.format(
262                             "Failed to unregister event ''{0}''! Event is not registered.",
263                             eventName));
264             } catch (final ConfigurationException ex) {
265                 LOGGER.error(MessageFormat.format(
266                         "Failed to unregister event ''{0}''!", eventName), ex);
267             }
268         } finally {
269             this.eventsLock.writeLock().unlock();
270         }
271 
272         return result;
273     }
274 
275     /**
276      * Unregister all context listeners and all context event types.
277      */
278     public void shutdown() {
279         LOGGER.debug("Shutting down.");
280 
281         this.notifiersLock.writeLock().lock();
282         this.eventsLock.writeLock().lock();
283         try {
284             this.shutdown = true;
285             destroyAllNotifiers();
286             unregisterAllEventTypes();
287         } finally {
288             this.eventsLock.writeLock().unlock();
289             this.notifiersLock.writeLock().unlock();
290         }
291 
292         LOGGER.info("Shutdown!");
293     }
294 
295     private void destroyAllNotifiers() {
296         LOGGER.debug("Unregistering all context listeners.");
297 
298         this.notifiersLock.writeLock().lock();
299         try {
300             for (final EventNotifier notifier : this.notifiers.values())
301                 notifier.destroy();
302 
303             this.administrator.stopAllStatements();
304             this.administrator.destroyAllStatements();
305             this.notifiers.clear();
306         } finally {
307             this.notifiersLock.writeLock().unlock();
308         }
309 
310         LOGGER.info("All context listeners unregistered!");
311     }
312 
313     private void unregisterAllEventTypes() {
314         LOGGER.debug("Unregistering all context events.");
315 
316         this.eventsLock.writeLock().lock();
317         try {
318             for (final String eventName : this.events.keySet())
319                 unregister(eventName);
320 
321             this.events.clear();
322         } finally {
323             this.eventsLock.writeLock().unlock();
324         }
325 
326         LOGGER.info("All context events unregistered!");
327     }
328 }