View Javadoc

1   /* This file is part of COPAL (COntext Provisioning for All).
2    *
3    * COPAL is a part of SM4All (Smart hoMes for All) project.
4    *
5    * COPAL is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * COPAL is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public License
16   * along with COPAL. If not, see <http://www.gnu.org/licenses/>.
17   */
18  package at.ac.tuwien.infosys.sm4all.copal.esper;
19  
20  import java.text.MessageFormat;
21  import org.apache.log4j.Level;
22  import org.apache.log4j.Logger;
23  import org.w3c.dom.Node;
24  import at.ac.tuwien.infosys.sm4all.copal.api.event.MalformedDocumentException;
25  import at.ac.tuwien.infosys.sm4all.copal.api.event.XMLContextEvent;
26  import at.ac.tuwien.infosys.sm4all.copal.api.event.XMLContextEventType;
27  import at.ac.tuwien.infosys.sm4all.copal.api.query.ActionQuery;
28  import at.ac.tuwien.infosys.sm4all.copal.api.query.ContextQuery;
29  import at.ac.tuwien.infosys.sm4all.copal.api.query.ProcessedEventQuery;
30  import at.ac.tuwien.infosys.sm4all.copal.api.query.QueryDestroyedException;
31  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistrationState;
32  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistryObserver;
33  import com.espertech.esper.client.EPAdministrator;
34  import com.espertech.esper.client.EPStatement;
35  
36  /**
37   * Esper subscriber which helps transform Esper events to
38   * {@link XMLContextEvent}s that are understood by {@link ContextQuery}. This
39   * class is bridge between Esper and COPAL.
40   * 
41   * @author fei
42   * @author sanjin
43   */
44  public class XMLEventNotifier implements RegistryObserver<XMLContextEventType> {
45  
46      private static final String PROCESSED_CRITERIA = MessageFormat.format(
47              "`{0}`", XMLContextEvent.Property.Processed);
48      private static final Logger LOGGER = Logger.getLogger(XMLEventNotifier.class);
49  
50      private final EPAdministrator administrator;
51      private final ContextQuery query;
52      private XMLContextEventType listenedType;
53      private EPStatement statement;
54  
55      /**
56       * Creates instance of Esper subscriber using specified
57       * {@link EPAdministrator} to create {@link EPStatement} from specified
58       * {@link ContextQuery}.
59       * 
60       * @param administrator the {@link EPAdministrator}.
61       * @param query the context query.
62       */
63      public XMLEventNotifier(final EPAdministrator administrator,
64              final ContextQuery query) {
65          super();
66  
67          this.administrator = administrator;
68          this.query = query;
69      }
70  
71      @Override
72      public void update(final RegistrationState state,
73              final XMLContextEventType eventType) {
74          if (this.query.getListenedType().equals(eventType.getName())) {
75              switch (state) {
76              case Registered:
77                  onRegister(eventType);
78                  break;
79              case Unregistered:
80                  onUnregister();
81                  break;
82              default:
83                  break;
84              }
85          }
86      }
87  
88      /**
89       * Called when Esper has new event that is represented by specified XML
90       * node. This method in return calls specified {@link ContextQuery}.
91       * 
92       * @param node the event.
93       */
94      public void update(final Node node) {
95          try {
96              this.query.onEvent(new XMLContextEvent(this.listenedType,
97                      node.getOwnerDocument()));
98          } catch (final QueryDestroyedException ex) {
99              if (LOGGER.isEnabledFor(Level.ERROR)) {
100                 LOGGER.error(
101                         "Query is unexpectedly destroyed! Destroying query's notifier.",
102                         ex);
103             }
104             if (null != this.statement) {
105                 this.statement.destroy();
106                 this.statement = null;
107             }
108         } catch (final MalformedDocumentException ex) {
109             if (LOGGER.isEnabledFor(Level.ERROR)) {
110                 LOGGER.error("Received XML event is malformed! Dropping it.",
111                         ex);
112             }
113         }
114     }
115 
116     private void onRegister(final XMLContextEventType eventType) {
117         if (LOGGER.isDebugEnabled()) {
118             LOGGER.debug(MessageFormat.format("Starting notifier ''{0}''.",
119                     this.query.getName()));
120         }
121 
122         this.listenedType = eventType;
123         this.statement = createStatement();
124         this.statement.setSubscriber(this);
125 
126         if (LOGGER.isInfoEnabled()) {
127             LOGGER.info(MessageFormat.format(
128                     "Successfully started notifier ''{0}''!",
129                     this.query.getName()));
130         }
131     }
132 
133     private void onUnregister() {
134         if (LOGGER.isDebugEnabled()) {
135             LOGGER.debug(MessageFormat.format("Stopping notifier ''{0}''.",
136                     this.query.getName()));
137         }
138 
139         if (null != this.listenedType) {
140             if (null != this.statement) {
141                 this.statement.destroy();
142                 this.statement = null;
143             }
144             this.listenedType = null;
145         }
146 
147         if (LOGGER.isInfoEnabled()) {
148             LOGGER.info(MessageFormat.format(
149                     "Successfully stopped notifier ''{0}''!",
150                     this.query.getName()));
151         }
152     }
153 
154     private EPStatement createStatement() {
155         final String name = this.query.getName();
156 
157         final String expression;
158         if (this.query instanceof ActionQuery) {
159             expression = getExpression((ActionQuery) this.query);
160         } else if (this.query instanceof ProcessedEventQuery) {
161             expression = getExpression((ProcessedEventQuery) this.query);
162         } else {
163             expression = getExpression(this.query);
164         }
165         final EPStatement result = this.administrator.createEPL(expression,
166                 name);
167 
168         if (LOGGER.isDebugEnabled()) {
169             LOGGER.debug(MessageFormat.format(
170                     "Esper statement created! name={0} expression={1}", name,
171                     expression));
172         }
173 
174         return result;
175     }
176 
177     private static String getExpression(final ActionQuery query) {
178         return MessageFormat.format("select * from {0}({1})",
179                 query.getListenedType(), query.getCriteria());
180     }
181 
182     private static String getExpression(final ProcessedEventQuery query) {
183         final String result;
184 
185         if (query.hasCriteria()) {
186             result = MessageFormat.format("select * from {0}({1}) where {2}",
187                     query.getListenedType(), PROCESSED_CRITERIA,
188                     query.getCriteria());
189         } else {
190             result = MessageFormat.format("select * from {0}({1})",
191                     query.getListenedType(), PROCESSED_CRITERIA);
192         }
193 
194         return result;
195     }
196 
197     private static String getExpression(final ContextQuery query) {
198         String result;
199 
200         if (query.hasCriteria()) {
201             result = MessageFormat.format("select * from {0}({1})",
202                     query.getListenedType(), query.getCriteria());
203         } else {
204             result = MessageFormat.format("select * from {0}",
205                     query.getListenedType());
206         }
207 
208         return result;
209     }
210 }