View Javadoc

1   /* This file is part of COPAL (COntext Provisioning for All).
2    *
3    * COPAL is a part of SM4All (Smart hoMes for All) project.
4    *
5    * COPAL is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * COPAL is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public License
16   * along with COPAL. If not, see <http://www.gnu.org/licenses/>.
17   */
18  package at.ac.tuwien.infosys.sm4all.copal.api.publisher;
19  
20  import java.text.MessageFormat;
21  import java.util.Arrays;
22  import java.util.Collections;
23  import java.util.HashSet;
24  import java.util.LinkedList;
25  import java.util.List;
26  import java.util.Set;
27  import java.util.concurrent.CopyOnWriteArraySet;
28  import java.util.concurrent.locks.ReadWriteLock;
29  import java.util.concurrent.locks.ReentrantReadWriteLock;
30  import org.apache.log4j.Level;
31  import org.apache.log4j.Logger;
32  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEvent;
33  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEventType;
34  import at.ac.tuwien.infosys.sm4all.copal.api.event.UnprocessedAction;
35  import at.ac.tuwien.infosys.sm4all.copal.api.osgi.GenericActivator;
36  import at.ac.tuwien.infosys.sm4all.copal.api.security.Authorization;
37  import at.ac.tuwien.infosys.sm4all.copal.api.security.AuthorizationMethod;
38  import at.ac.tuwien.infosys.sm4all.copal.api.security.Authorizations;
39  import at.ac.tuwien.infosys.sm4all.copal.api.service.ContextEventTypeRegistry;
40  import at.ac.tuwien.infosys.sm4all.copal.api.service.FailedPublishingException;
41  import at.ac.tuwien.infosys.sm4all.copal.api.service.PublishingService;
42  import at.ac.tuwien.infosys.sm4all.copal.api.util.Attribute;
43  import at.ac.tuwien.infosys.sm4all.copal.api.util.Attributes;
44  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistrationState;
45  import at.ac.tuwien.infosys.sm4all.copal.api.util.RegistryObservable.RegistryObserver;
46  
47  /**
48   * Base {@link ContextPublisher} that correctly activates itself with COPAL and
49   * is able to publish {@link ContextEvent}s.
50   * 
51   * @author sanjin
52   */
53  public abstract class BasePublisher extends GenericActivator implements
54          ContextPublisher, RegistryObserver<ContextEventType> {
55  
56      private static final Logger LOGGER = Logger.getLogger(BasePublisher.class);
57  
58      private final ReadWriteLock lock = new ReentrantReadWriteLock();
59      private final ReadWriteLock defaultsLock = new ReentrantReadWriteLock();
60      private final Set<String> publishableTypes = new CopyOnWriteArraySet<String>();
61      private final Attributes attributes = new Attributes();
62      private final Authorizations authorizations = new Authorizations();
63      private final List<UnprocessedAction> actions = new LinkedList<UnprocessedAction>();
64      private final String sourceID;
65      private final Set<String> publishedTypes;
66      private PublishingService publishingService;
67      private ContextEventTypeRegistry eventTypeRegistry;
68      private Long ttl;
69      private Integer priority;
70  
71      /**
72       * Creates an instance of base processor with specified source ID and
73       * published types as return values for {@link #getSourceID()} and
74       * {@link #getPublishedTypes()} methods respectively.
75       * 
76       * @param sourceID the source ID of this {@link ContextPublisher}.
77       * @param publishedTypes the names of published {@link ContextEventType}s.
78       * @throws NullPointerException if specified source ID or published types
79       *         array is <code>null</code>.
80       * @throws IllegalArgumentException if specified source ID is an empty or
81       *         blank string or published types array is empty.
82       */
83      protected BasePublisher(final String sourceID,
84              final String... publishedTypes) {
85          super(PublishingService.class.getName(),
86                  ContextEventTypeRegistry.class.getName());
87  
88          if (null == sourceID) {
89              throw new NullPointerException("Source ID cannot be null.");
90          }
91          if (sourceID.trim().isEmpty()) {
92              throw new IllegalArgumentException(
93                      "Source ID cannot be an empty blank string.");
94          }
95          if (null == publishedTypes) {
96              throw new NullPointerException("Published types cannot be null.");
97          }
98          if (0 == publishedTypes.length) {
99              throw new IllegalArgumentException(
100                     "There must be at least one published type.");
101         }
102 
103         this.sourceID = sourceID;
104         this.publishedTypes = Collections.unmodifiableSet(new HashSet<String>(
105                 Arrays.asList(publishedTypes)));
106     }
107 
108     /**
109      * This method is called when {@link PublishingService} becomes available
110      * for publishing @link ContextEvent}s of the specified
111      * {@link ContextEventType}. This method should be implemented by specific
112      * publishers and is meant as a notification that publisher can start
113      * publishing {@link ContextEvent}s of this {@link ContextEventType}.
114      * 
115      * @param type the {@link ContextEventType} of published events.
116      * @return if this {@link BasePublisher} has been successfully started for
117      *         specified {@link ContextEventType}.
118      */
119     protected abstract boolean start(ContextEventType type);
120 
121     /**
122      * This method is called when {@link PublishingService} becomes unavailable
123      * for publishing {@link ContextEvent}s of the specified
124      * {@link ContextEventType}. This method should be implemented by specific
125      * publishers and is meant as a notification that publisher should stop
126      * publishing {@link ContextEvent}s of this {@link ContextEventType}.
127      * 
128      * @param type the {@link ContextEventType} of published
129      *        {@link ContextEvent}s.
130      */
131     protected abstract void stop(ContextEventType type);
132 
133     @Override
134     public String getSourceID() {
135         return this.sourceID;
136     }
137 
138     @Override
139     public String[] getPublishedTypes() {
140         return this.publishedTypes.toArray(new String[this.publishedTypes.size()]);
141     }
142 
143     /**
144      * Returns if this {@link BasePublisher} is started and can publish
145      * {@link ContextEvent}s of specified type.
146      * 
147      * @param eventType the type of {@link ContextEvent}s.
148      * @return if this {@link BasePublisher} is started and can publish
149      *         {@link ContextEvent}s.
150      */
151     protected boolean isStarted(final String eventType) {
152         return this.publishableTypes.contains(eventType);
153     }
154 
155     @Override
156     protected void start() {
157         this.lock.writeLock().lock();
158         try {
159             this.publishingService = getDependency(PublishingService.class.getName());
160             this.eventTypeRegistry = getDependency(ContextEventTypeRegistry.class.getName());
161 
162             if (null != this.eventTypeRegistry) {
163                 this.eventTypeRegistry.attach(this);
164             }
165         } finally {
166             this.lock.writeLock().unlock();
167         }
168     }
169 
170     @Override
171     protected void stop() {
172         this.lock.writeLock().lock();
173         try {
174             if (null != this.eventTypeRegistry) {
175                 this.eventTypeRegistry.detach(this);
176             }
177 
178             this.eventTypeRegistry = null;
179             this.publishingService = null;
180 
181             this.publishableTypes.clear();
182         } finally {
183             this.lock.writeLock().unlock();
184         }
185     }
186 
187     @Override
188     public void update(final RegistrationState state,
189             final ContextEventType eventType) {
190         if (this.publishedTypes.contains(eventType.getName())) {
191             switch (state) {
192             case Registered:
193                 onRegister(eventType);
194                 break;
195             case Unregistered:
196                 onUnregister(eventType);
197                 break;
198             default:
199                 break;
200             }
201         }
202     }
203 
204     /**
205      * Sets the default time-to-live used for all {@link ContextEvent}s
206      * published by this {@link BasePublisher}.
207      * 
208      * @param ttl the default time-to-live.
209      */
210     protected void setTTL(final long ttl) {
211         this.defaultsLock.writeLock().lock();
212         try {
213             this.ttl = ttl;
214         } finally {
215             this.defaultsLock.writeLock().unlock();
216         }
217     }
218 
219     /**
220      * Unsets the default time-to-live.
221      * 
222      * @see #setTTL(long)
223      */
224     protected void unsetTTL() {
225         this.defaultsLock.writeLock().lock();
226         try {
227             this.ttl = null;
228         } finally {
229             this.defaultsLock.writeLock().unlock();
230         }
231     }
232 
233     /**
234      * Sets the default priority used for all {@link ContextEvent}s published by
235      * this {@link BasePublisher}.
236      * 
237      * @param priority the default priority.
238      */
239     protected void setPriority(final int priority) {
240         this.defaultsLock.writeLock().lock();
241         try {
242             this.priority = priority;
243         } finally {
244             this.defaultsLock.writeLock().unlock();
245         }
246     }
247 
248     /**
249      * Unsets the default priority.
250      * 
251      * @see #setPriority(int)
252      */
253     protected void unsetPriority() {
254         this.defaultsLock.writeLock().lock();
255         try {
256             this.priority = null;
257         } finally {
258             this.defaultsLock.writeLock().unlock();
259         }
260     }
261 
262     /**
263      * Sets the {@link Attribute} with specified name to specified value in all
264      * {@link ContextEvent}s published by this {@link BasePublisher}. If the
265      * value is <code>null</code> the {@link Attribute} will be removed.
266      * 
267      * @param name the name of {@link Attribute}.
268      * @param value the value of {@link Attribute}.
269      */
270     /**
271      * Add specified {@link Attribute} in all {@link ContextEvent}s published by
272      * this {@link BasePublisher}.
273      * 
274      * @param attribute the {@link Attribute}.
275      */
276     protected void add(final Attribute attribute) {
277         this.defaultsLock.writeLock().lock();
278         try {
279             this.attributes.register(attribute);
280         } finally {
281             this.defaultsLock.writeLock().unlock();
282         }
283     }
284 
285     /**
286      * Adds specified {@link Authorization} to all {@link ContextEvent}s
287      * published by this {@link BasePublisher}. If there is already an
288      * {@link Authorization} with same name of the {@link AuthorizationMethod}
289      * as specified {@link Authorization}, their attributes will be merged with
290      * attributes from specified {@link Authorization} overwriting ones in
291      * already added {@link Authorization}.
292      * 
293      * @param authorization the {@link Authorization}.
294      */
295     protected void add(final Authorization authorization) {
296         this.defaultsLock.writeLock().lock();
297         try {
298             this.authorizations.register(authorization);
299         } finally {
300             this.defaultsLock.writeLock().unlock();
301         }
302     }
303 
304     /**
305      * Removes an {@link Authorization} that has specified name of
306      * {@link AuthorizationMethod} from being added to all {@link ContextEvent}s
307      * published by this {@link BasePublisher}.
308      * 
309      * @param method the name of {@link AuthorizationMethod}.
310      */
311     protected void removeAuthorization(final String method) {
312         this.defaultsLock.writeLock().lock();
313         try {
314             this.authorizations.unregister(method);
315         } finally {
316             this.defaultsLock.writeLock().unlock();
317         }
318     }
319 
320     /**
321      * Appends the specified {@link UnprocessedAction} to all
322      * {@link ContextEvent}s published by this {@link BasePublisher}.
323      * 
324      * @param action the {@link UnprocessedAction}.
325      */
326     protected void appendAction(final UnprocessedAction action) {
327         if (null == action) {
328             throw new NullPointerException("Action cannot be null.");
329         }
330 
331         this.defaultsLock.writeLock().lock();
332         try {
333             this.actions.add(action);
334         } finally {
335             this.defaultsLock.writeLock().unlock();
336         }
337     }
338 
339     /**
340      * Publish specified {@link ContextEvent}.
341      * 
342      * @param event the {@link ContextEvent}.
343      * @throws FailedPublishingException if publishing was unsuccessful.
344      */
345     protected void publish(final ContextEvent event)
346             throws FailedPublishingException {
347         final String eventType = event.getType().getName();
348 
349         if (this.publishedTypes.contains(eventType)) {
350             this.lock.readLock().lock();
351             try {
352                 if (this.publishableTypes.contains(eventType)) {
353                     if (null == this.publishingService) {
354                         if (LOGGER.isEnabledFor(Level.ERROR)) {
355                             LOGGER.error(MessageFormat.format(
356                                     "Failed to publish event ''{0}'' from ''{1}''! No publishing service.",
357                                     eventType, this.sourceID));
358                         }
359                         throw new FailedPublishingException(
360                                 "No publishing service.", event);
361                     }
362 
363                     beforePublish(event);
364                     this.publishingService.publish(event);
365                     if (LOGGER.isInfoEnabled()) {
366                         LOGGER.info(MessageFormat.format(
367                                 "Successfully publish event from ''{0}''!",
368                                 this.sourceID));
369                     }
370                 } else {
371                     if (LOGGER.isEnabledFor(Level.ERROR)) {
372                         LOGGER.error(MessageFormat.format(
373                                 "Failed to publish event ''{0}'' from ''{1}''! Publisher was not started for this event.",
374                                 eventType, this.sourceID));
375                     }
376                     throw new FailedPublishingException(
377                             "Publisher was not started for this event.", event);
378                 }
379             } finally {
380                 this.lock.readLock().unlock();
381             }
382         } else {
383             if (LOGGER.isEnabledFor(Level.ERROR)) {
384                 LOGGER.error(MessageFormat.format(
385                         "Failed to publish event ''{0}'' from ''{1}''! Publisher does not publish this event.",
386                         eventType, this.sourceID));
387             }
388             throw new FailedPublishingException(
389                     "Publisher does not publish this event.", event);
390         }
391     }
392 
393     private void onRegister(final ContextEventType eventType) {
394         final String eventName = eventType.getName();
395         if (LOGGER.isDebugEnabled()) {
396             LOGGER.debug(MessageFormat.format(
397                     "Starting publisher ''{0}'' for ''{1}'' events.",
398                     this.sourceID, eventName));
399         }
400 
401         if (start(eventType)) {
402             this.publishableTypes.add(eventName);
403             if (LOGGER.isInfoEnabled()) {
404                 LOGGER.info(MessageFormat.format(
405                         "Successfully started publisher ''{0}'' for ''{1}'' events!",
406                         this.sourceID, eventName));
407             }
408         } else {
409             if (LOGGER.isEnabledFor(Level.ERROR)) {
410                 LOGGER.error(MessageFormat.format(
411                         "Publisher ''{0}'' could not be started for ''{1}'' events!",
412                         this.sourceID, eventName));
413             }
414         }
415     }
416 
417     private void onUnregister(final ContextEventType eventType) {
418         final String eventName = eventType.getName();
419 
420         if (this.publishableTypes.contains(eventName)) {
421             if (LOGGER.isDebugEnabled()) {
422                 LOGGER.debug(MessageFormat.format(
423                         "Stopping publisher ''{0}'' for ''{1}'' events.",
424                         this.sourceID, eventName));
425             }
426 
427             stop(eventType);
428             this.publishableTypes.remove(eventName);
429 
430             if (LOGGER.isInfoEnabled()) {
431                 LOGGER.info(MessageFormat.format(
432                         "Successfully stopped publisher ''{0}''  for ''{1}'' events!",
433                         this.sourceID, eventName));
434             }
435         }
436     }
437 
438     private void beforePublish(final ContextEvent event) {
439         this.defaultsLock.readLock().lock();
440         try {
441             if (null != this.ttl) {
442                 event.setTTL(this.ttl);
443             }
444             if (null != this.priority) {
445                 event.setPriority(this.priority);
446             }
447             for (final Attribute attribute : this.attributes.getAll()) {
448                 event.add(attribute);
449             }
450             for (final Authorization authorization : this.authorizations.getAll()) {
451                 event.add(authorization);
452             }
453             event.append(this.actions.toArray(new UnprocessedAction[this.actions.size()]));
454         } finally {
455             this.defaultsLock.readLock().unlock();
456         }
457     }
458 }