View Javadoc

1   /* This file is part of COPAL (COntext Provisioning for All).
2    *
3    * COPAL is a part of SM4All (Smart hoMes for All) project.
4    *
5    * COPAL is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * COPAL is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public License
16   * along with COPAL. If not, see <http://www.gnu.org/licenses/>.
17   */
18  package at.ac.tuwien.infosys.sm4all.copal.api.query;
19  
20  import java.text.MessageFormat;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.HashMap;
24  import java.util.HashSet;
25  import java.util.LinkedList;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.CopyOnWriteArraySet;
30  import java.util.concurrent.locks.ReadWriteLock;
31  import java.util.concurrent.locks.ReentrantReadWriteLock;
32  import org.apache.log4j.Level;
33  import org.apache.log4j.Logger;
34  import org.osgi.framework.BundleContext;
35  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEvent;
36  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEventType;
37  import at.ac.tuwien.infosys.sm4all.copal.api.event.CurrentAction;
38  import at.ac.tuwien.infosys.sm4all.copal.api.event.XMLContextEvent;
39  import at.ac.tuwien.infosys.sm4all.copal.api.osgi.GenericActivator;
40  import at.ac.tuwien.infosys.sm4all.copal.api.osgi.Tracker;
41  import at.ac.tuwien.infosys.sm4all.copal.api.processor.ContextProcessor;
42  import at.ac.tuwien.infosys.sm4all.copal.api.processor.ProcessorAction;
43  import at.ac.tuwien.infosys.sm4all.copal.api.service.ContextEventTypeRegistry;
44  import at.ac.tuwien.infosys.sm4all.copal.api.service.DeadLetterChannel;
45  import at.ac.tuwien.infosys.sm4all.copal.api.service.FailedPublishingException;
46  import at.ac.tuwien.infosys.sm4all.copal.api.service.PublishingService;
47  import at.ac.tuwien.infosys.sm4all.copal.api.util.AlreadyRegisteredException;
48  import at.ac.tuwien.infosys.sm4all.copal.api.util.NotRegisteredException;
49  import at.ac.tuwien.infosys.sm4all.copal.api.util.Registry;
50  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistrationState;
51  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistryObserver;
52  
53  /**
54   * This class is used with the {@link ContextProcessor}s to create
55   * {@link ContextQuery}s which listen on occurrence of {@link ContextEvent}s
56   * with a specific {@link ContextEventType} that require handling of their
57   * {@link CurrentAction}.
58   * 
59   * @author sanjin
60   */
61  public class ActionQuery extends ContextQuery implements
62          Registry<String, ContextProcessor> {
63  
64      private static final Logger LOGGER = Logger.getLogger(ActionQuery.class);
65  
66      private final ReadWriteLock lock = new ReentrantReadWriteLock();
67      private final Map<String, ProcessorInvoker> invokers = new HashMap<String, ProcessorInvoker>();
68      private final Tracker<PublishingService> pubslishingServiceTracker = new Tracker<PublishingService>(
69              PublishingService.class.getName());
70      private final Tracker<DeadLetterChannel> deadLetterChannelTracker = new Tracker<DeadLetterChannel>(
71              DeadLetterChannel.class.getName());
72      private final String actionName;
73      private BundleContext bundleContext;
74  
75      /**
76       * Create instance of {@link ContextQuery} for {@link ContextProcessor}s. It
77       * will catch only {@link ContextEvent}s that have name of their
78       * {@link ContextEventType} equal to specified listened type and have name
79       * of their {@link CurrentAction} equal to specified action name.
80       * 
81       * @param listenedType the name of listened {@link ContextEventType}.
82       * @param actionName the name of handled {@link CurrentAction}.
83       * @throws NullPointerException if specified of listened
84       *         {@link ContextEventType} or name of handled {@link CurrentAction}
85       *         is <code>null</code>.
86       * @throws IllegalArgumentException if specified name of listened
87       *         {@link ContextEventType} or name of handled {@link CurrentAction}
88       *         is an empty or blank string.
89       */
90      public ActionQuery(final String listenedType, final String actionName) {
91          super(MessageFormat.format("{0}.{1}", listenedType, actionName),
92                  listenedType, MessageFormat.format(
93                          "`{0}` = \"{1}\" and `{2}` = \"{3}\"",
94                          XMLContextEvent.Property.Type, listenedType,
95                          XMLContextEvent.Property.CurrentAction, actionName));
96  
97          if (null == actionName) {
98              throw new NullPointerException("Action name cannot be null.");
99          }
100         if (actionName.trim().isEmpty()) {
101             throw new IllegalArgumentException(
102                     "Action name cannot be an empty or blank string.");
103         }
104 
105         this.actionName = actionName;
106     }
107 
108     /**
109      * Returns the name of handled {@link CurrentAction}.
110      * 
111      * @return the name of handled {@link CurrentAction}.
112      */
113     public String getActionName() {
114         return this.actionName;
115     }
116 
117     @Override
118     public void start(final BundleContext context) {
119         this.lock.readLock().lock();
120         try {
121             this.bundleContext = context;
122 
123             this.pubslishingServiceTracker.start(this.bundleContext);
124             this.deadLetterChannelTracker.start(this.bundleContext);
125 
126             for (final ProcessorInvoker invoker : this.invokers.values()) {
127                 invoker.start(this.bundleContext);
128             }
129         } finally {
130             this.lock.readLock().unlock();
131         }
132     }
133 
134     @Override
135     public void stop(final BundleContext context) {
136         this.lock.readLock().lock();
137         try {
138             for (final ProcessorInvoker invoker : this.invokers.values()) {
139                 invoker.stop(this.bundleContext);
140             }
141 
142             this.deadLetterChannelTracker.stop(this.bundleContext);
143             this.pubslishingServiceTracker.stop(this.bundleContext);
144 
145             this.bundleContext = null;
146         } finally {
147             this.lock.readLock().unlock();
148         }
149     }
150 
151     @Override
152     public void destroy() throws QueryDestroyedException {
153         super.destroy();
154 
155         unregisterAll();
156     }
157 
158     @Override
159     public void onEvent(final ContextEvent event)
160             throws QueryDestroyedException {
161         if (null == event) {
162             throw new NullPointerException("Event cannot be null.");
163         }
164 
165         this.lock.readLock().lock();
166         try {
167             if (isDestroyed()) {
168                 throw new QueryDestroyedException(this);
169             }
170 
171             ContextEvent result = event;
172             for (final ProcessorInvoker invoker : this.invokers.values()) {
173                 result = invoker.onEvent(result);
174             }
175 
176             final CurrentAction currentAction = result.getCurrentAction();
177             if (currentAction.isRequired() && (!currentAction.isProcessed())) {
178                 if (LOGGER.isEnabledFor(Level.ERROR)) {
179                     LOGGER.error(MessageFormat.format(
180                             "Failed processing of required action ''{0}'' on event ''{1}''! Not processed.",
181                             this.actionName, getListenedType()));
182                 }
183                 final DeadLetterChannel deadLetterChannel = this.deadLetterChannelTracker.getService();
184                 if (null != deadLetterChannel) {
185                     deadLetterChannel.onEvent(result);
186                 }
187             } else {
188                 final PublishingService publishingService = this.pubslishingServiceTracker.getService();
189                 if (null == publishingService) {
190                     if (LOGGER.isEnabledFor(Level.ERROR)) {
191                         LOGGER.error(MessageFormat.format(
192                                 "Failed to publish processed event ''{0}'' for action ''{1}''! No publishing service.",
193                                 getListenedType(), this.actionName));
194                     }
195                 } else {
196                     try {
197                         publishingService.publish(result);
198                     } catch (final FailedPublishingException ex) {
199                         if (LOGGER.isEnabledFor(Level.ERROR)) {
200                             LOGGER.error(
201                                     MessageFormat.format(
202                                             "Failed to publish processed event ''{0}'' for action ''{1}''!",
203                                             getListenedType(), this.actionName),
204                                     ex);
205                         }
206                     }
207                 }
208             }
209         } finally {
210             this.lock.readLock().unlock();
211         }
212     }
213 
214     /**
215      * Register specified {@link ContextProcessor} with this {@link ActionQuery}
216      * . If {@link ContextEvent} is caught, which requires the
217      * {@link ProcessorAction}, the specified {@link ContextProcessor} will be
218      * invoked.
219      * 
220      * @param processor the {@link ContextProcessor}.
221      * @throws AlreadyRegisteredException if a {@link ContextProcessor} with
222      *         same name is already registered.
223      * @throws NullPointerException if specified {@link ContextProcessor} is
224      *         <code>null</code>.
225      * @throws IllegalArgumentException if specified {@link ContextProcessor}
226      *         cannot handle the {@link CurrentAction} on the listened
227      *         {@link ContextEventType}
228      * @throws QueryDestroyedException if this {@link ActionQuery} has been
229      *         previously destroyed.
230      */
231     @Override
232     public void register(final ContextProcessor processor)
233             throws AlreadyRegisteredException, QueryDestroyedException {
234         if (null == processor) {
235             throw new NullPointerException("Processor cannot be null.");
236         }
237 
238         final String processorName = processor.getName();
239         final String listenedType = getListenedType();
240 
241         final Set<ProcessorAction> actions = new HashSet<ProcessorAction>();
242         for (final ProcessorAction action : processor.getActions()) {
243             if (listenedType.equals(action.getInput())
244                     && this.actionName.equals(action.getName())) {
245                 actions.add(action);
246             }
247         }
248 
249         if (actions.isEmpty()) {
250             throw new IllegalArgumentException(
251                     MessageFormat.format(
252                             "Processor ''{0}'' cannot handle action ''{1}'' on events ''{2}''",
253                             processorName, this.actionName, listenedType));
254         }
255 
256         this.lock.writeLock().lock();
257         try {
258             if (isDestroyed()) {
259                 throw new QueryDestroyedException(this);
260             }
261 
262             if (this.invokers.containsKey(processorName)) {
263                 throw new AlreadyRegisteredException(processorName);
264             }
265 
266             final ProcessorInvoker invoker = new ProcessorInvoker(processor,
267                     actions.toArray(new ProcessorAction[actions.size()]));
268             this.invokers.put(processorName, invoker);
269             if (null != this.bundleContext) {
270                 invoker.start(this.bundleContext);
271             }
272 
273             if (LOGGER.isInfoEnabled()) {
274                 LOGGER.info(MessageFormat.format(
275                         "Successfully registered processor ''{0}'' with query ''{1}''.",
276                         processorName, getName()));
277             }
278         } finally {
279             this.lock.writeLock().unlock();
280         }
281     }
282 
283     /**
284      * Unregister {@link ContextProcessor} with specified name from this
285      * {@link ActionQuery}. The {@link ContextProcessor} will not receive any
286      * further {@link ContextEvent}s from this {@link ActionQuery}.
287      * 
288      * @param processorName the name of the {@link ContextProcessor}.
289      * @throws NotRegisteredException if a {@link ContextProcessor} with
290      *         specified name is not registered.
291      * @throws NullPointerException if specified name of the
292      *         {@link ContextProcessor} is <code>null</code>.
293      */
294     @Override
295     public void unregister(final String processorName)
296             throws NotRegisteredException {
297         if (null == processorName) {
298             throw new NullPointerException("Processor name cannot be null.");
299         }
300 
301         this.lock.writeLock().lock();
302         try {
303             if (!this.invokers.containsKey(processorName)) {
304                 throw new NotRegisteredException(processorName);
305             }
306 
307             if (!isDestroyed()) {
308                 final ProcessorInvoker invoker = this.invokers.remove(processorName);
309                 if (null != this.bundleContext) {
310                     invoker.stop(this.bundleContext);
311                 }
312 
313                 if (LOGGER.isInfoEnabled()) {
314                     LOGGER.info(MessageFormat.format(
315                             "Successfully unregistered processor ''{0}'' from query ''{1}''.",
316                             processorName, getName()));
317                 }
318             }
319         } finally {
320             this.lock.writeLock().unlock();
321         }
322     }
323 
324     /**
325      * Returns if a {@link ContextProcessor} with specified name is currently
326      * registered.
327      * 
328      * @param processorName the name of the {@link ContextProcessor}.
329      * @return if a {@link ContextProcessor} with specified name is currently
330      *         registered.
331      * @throws NullPointerException if specified name of
332      *         {@link ContextProcessor} is <code>null</code>.
333      */
334     @Override
335     public boolean isRegistered(final String processorName) {
336         if (null == processorName) {
337             throw new NullPointerException("Processor name cannot be null.");
338         }
339 
340         final boolean result;
341 
342         this.lock.readLock().lock();
343         try {
344             result = this.invokers.containsKey(processorName);
345         } finally {
346             this.lock.readLock().unlock();
347         }
348 
349         return result;
350     }
351 
352     /**
353      * Returns {@link ContextProcessor} with specified name or <code>null</code>
354      * if there is no such {@link ContextProcessor} registered.
355      * 
356      * @param processorName the name of the {@link ContextProcessor}.
357      * @return the {@link ContextProcessor} or <code>null</code> if there is no
358      *         such {@link ContextProcessor} registered.
359      * @throws NullPointerException if specified name of
360      *         {@link ContextProcessor} is <code>null</code>.
361      */
362     @Override
363     public ContextProcessor get(final String processorName) {
364         if (null == processorName) {
365             throw new NullPointerException("Processor name cannot be null.");
366         }
367 
368         ContextProcessor result = null;
369 
370         this.lock.readLock().lock();
371         try {
372             if (this.invokers.containsKey(processorName)) {
373                 result = this.invokers.get(processorName).getProcessor();
374             }
375         } finally {
376             this.lock.readLock().unlock();
377         }
378 
379         return result;
380     }
381 
382     /**
383      * Returns all currently registered {@link ContextProcessor}s.
384      * 
385      * @return all currently registered {@link ContextProcessor}s.
386      */
387     @Override
388     public ContextProcessor[] getAll() {
389         final List<ContextProcessor> result = new LinkedList<ContextProcessor>();
390 
391         this.lock.readLock().lock();
392         try {
393             for (final ProcessorInvoker invoker : this.invokers.values()) {
394                 result.add(invoker.getProcessor());
395             }
396         } finally {
397             this.lock.readLock().unlock();
398         }
399 
400         return result.toArray(new ContextProcessor[result.size()]);
401     }
402 
403     private void unregisterAll() {
404         if (LOGGER.isDebugEnabled()) {
405             LOGGER.debug(MessageFormat.format(
406                     "Unregistering all processors from query ''{0}''.",
407                     getName()));
408         }
409 
410         this.lock.writeLock().lock();
411         try {
412             for (final String processorName : this.invokers.keySet()) {
413                 try {
414                     unregister(processorName);
415                 } catch (final NotRegisteredException ex) {
416                     if (LOGGER.isEnabledFor(Level.WARN)) {
417                         LOGGER.warn(
418                                 MessageFormat.format(
419                                         "Failed to unregister processor ''{0}'' from query ''{1}''! Ignoring.",
420                                         processorName, getName()), ex);
421                     }
422                 }
423             }
424 
425             this.invokers.clear();
426         } finally {
427             this.lock.writeLock().unlock();
428         }
429 
430         if (LOGGER.isInfoEnabled()) {
431             LOGGER.info(MessageFormat.format(
432                     "All processors from query ''{0}'' unregistered!",
433                     getName()));
434         }
435     }
436 
437     private static class ProcessorInvoker extends GenericActivator implements
438             RegistryObserver<ContextEventType> {
439 
440         private final ReadWriteLock lock = new ReentrantReadWriteLock();
441         private final Set<String> canPublishTypes = new HashSet<String>();
442         private final Set<ProcessorAction> processableActions = new CopyOnWriteArraySet<ProcessorAction>();
443         private final ContextProcessor processor;
444         private final ProcessorAction[] actions;
445         private final Set<String> publishedTypes;
446         private PublishingService publishingService;
447         private ContextEventTypeRegistry registry;
448 
449         /**
450          * Create {@link ProcessorInvoker} of specified {@link ContextProcessor}
451          * for specified {@link ProcessorAction}s.
452          * 
453          * @param processor the {@link ContextProcessor}.
454          * @param actions the {@link ProcessorAction}s.
455          */
456         protected ProcessorInvoker(final ContextProcessor processor,
457                 final ProcessorAction... actions) {
458             super(hasAnyOutput(actions) ? new String[]{
459                     PublishingService.class.getName(),
460                     ContextEventTypeRegistry.class.getName() } : new String[]{});
461 
462             this.processor = processor;
463             this.actions = Arrays.copyOf(actions, actions.length);
464 
465             final Set<String> outputs = new HashSet<String>();
466             for (final ProcessorAction action : this.actions) {
467                 if (action.hasOutput()) {
468                     outputs.addAll(Arrays.asList(action.getOutput()));
469                 }
470             }
471             this.publishedTypes = Collections.unmodifiableSet(outputs);
472         }
473 
474         /**
475          * Returns the {@link ContextProcessor}.
476          * 
477          * @return the {@link ContextProcessor}.
478          */
479         public ContextProcessor getProcessor() {
480             return this.processor;
481         }
482 
483         @Override
484         protected void start() {
485             this.lock.writeLock().lock();
486             try {
487                 if (!this.publishedTypes.isEmpty()) {
488                     this.publishingService = getDependency(PublishingService.class.getName());
489                     this.registry = getDependency(ContextEventTypeRegistry.class.getName());
490 
491                     this.registry.attach(this);
492                 }
493 
494                 for (final ProcessorAction action : this.actions) {
495                     if (!action.hasOutput()) {
496                         this.processableActions.add(action);
497                     }
498                 }
499             } finally {
500                 this.lock.writeLock().unlock();
501             }
502         }
503 
504         @Override
505         protected void stop() {
506             this.lock.writeLock().lock();
507             try {
508                 if (!this.publishedTypes.isEmpty()) {
509                     this.registry.detach(this);
510 
511                     this.registry = null;
512                     this.publishingService = null;
513 
514                     this.canPublishTypes.clear();
515                 }
516 
517                 this.processableActions.clear();
518             } finally {
519                 this.lock.writeLock().unlock();
520             }
521         }
522 
523         @Override
524         public void update(final RegistrationState state,
525                 final ContextEventType eventType) {
526             final String eventName = eventType.getName();
527             if (this.publishedTypes.contains(eventName)) {
528                 switch (state) {
529                 case Registered:
530                     this.canPublishTypes.add(eventName);
531                     for (final ProcessorAction action : this.actions) {
532                         if (this.canPublishTypes.containsAll(Arrays.asList(action.getOutput()))) {
533                             this.processableActions.add(action);
534                         }
535                     }
536                     break;
537                 case Unregistered:
538                     this.canPublishTypes.remove(eventName);
539                     for (final ProcessorAction action : this.actions) {
540                         if (!this.canPublishTypes.containsAll(Arrays.asList(action.getOutput()))) {
541                             this.processableActions.remove(action);
542                         }
543                     }
544                     break;
545                 default:
546                     break;
547                 }
548             }
549         }
550 
551         /**
552          * Invoke the {@link ContextProcessor} with specified input
553          * {@link ContextEvent}.
554          * 
555          * @param input the {@link ContextEvent}.
556          * @return the input {@link ContextEvent} (possibly modified by the
557          *         {@link ContextProcessor}).
558          */
559         @SuppressWarnings("synthetic-access")
560         public ContextEvent onEvent(final ContextEvent input) {
561             ContextEvent result = input;
562             final String processorName = this.processor.getName();
563             final String inputType = input.getType().getName();
564 
565             this.lock.readLock().lock();
566             try {
567                 for (final ProcessorAction action : this.processableActions) {
568                     final String actionName = action.getName();
569                     try {
570                         final Set<ContextEvent> output = new HashSet<ContextEvent>(
571                                 Arrays.asList(invoke(action, result)));
572 
573                         if (output.size() > 0) {
574                             for (final ContextEvent event : output) {
575                                 if (input.equals(event)) {
576                                     result = event;
577                                     break;
578                                 }
579                             }
580                             if (result != input) {
581                                 output.remove(result);
582                             }
583 
584                             this.publishingService.publish(output.toArray(new ContextEvent[output.size()]));
585                         }
586 
587                         if (LOGGER.isInfoEnabled()) {
588                             LOGGER.info(MessageFormat.format(
589                                     "Succesfully processed action ''{0}'' for event ''{1}'' in processor ''{2}''!",
590                                     actionName, inputType, processorName));
591                         }
592                     } catch (final FailedPublishingException ex) {
593                         if (LOGGER.isEnabledFor(Level.ERROR)) {
594                             LOGGER.error(
595                                     MessageFormat.format(
596                                             "Failed processing of action ''{0}'' on event ''{1}'' in processor ''{2}''!",
597                                             actionName, inputType,
598                                             processorName), ex);
599                         }
600                     }
601                 }
602 
603                 if (!this.processableActions.isEmpty()) {
604                     result.getCurrentAction().processedBy(processorName);
605                 }
606             } finally {
607                 this.lock.readLock().unlock();
608             }
609 
610             return result;
611         }
612 
613         @SuppressWarnings("synthetic-access")
614         private ContextEvent[] invoke(final ProcessorAction action,
615                 final ContextEvent input) throws FailedPublishingException {
616             final String actionName = action.getName();
617             final String inputType = input.getType().getName();
618             final String processorName = this.processor.getName();
619             final ContextEvent[] result = this.processor.process(action, input);
620 
621             if ((null != result) && (result.length > 0)) {
622                 if (action.hasOutput()) {
623                     final Set<String> output = new HashSet<String>(
624                             Arrays.asList(action.getOutput()));
625                     for (final ContextEvent event : result) {
626                         final String eventType = event.getType().getName();
627                         if (!output.contains(eventType)) {
628                             if (LOGGER.isEnabledFor(Level.ERROR)) {
629                                 LOGGER.error(MessageFormat.format(
630                                         "Event ''{0}'' cannot be result of action ''{1}'' on event ''{2}'' in processor ''{3}''!",
631                                         eventType, actionName, inputType,
632                                         processorName));
633                             }
634                             throw new FailedPublishingException(
635                                     "Event cannot be result of action.", event);
636                         }
637                     }
638                 } else {
639                     if (LOGGER.isEnabledFor(Level.ERROR)) {
640                         LOGGER.error(MessageFormat.format(
641                                 "Action ''{1}'' on event ''{2}'' in processor ''{3}'' should not have output!",
642                                 actionName, inputType, processorName));
643                     }
644                     throw new FailedPublishingException(
645                             "Action should not have output.", result[0]);
646                 }
647             }
648 
649             return result;
650         }
651 
652         private static boolean hasAnyOutput(final ProcessorAction[] actions) {
653             boolean result = false;
654 
655             for (final ProcessorAction action : actions) {
656                 if (action.hasOutput()) {
657                     result = true;
658                     break;
659                 }
660             }
661 
662             return result;
663         }
664     }
665 }