View Javadoc

1   /* This file is part of COPAL (COntext Provisioning for All).
2    *
3    * COPAL is a part of SM4All (Smart hoMes for All) project.
4    *
5    * COPAL is free software: you can redistribute it and/or modify
6    * it under the terms of the GNU Lesser General Public License as published by
7    * the Free Software Foundation, either version 3 of the License, or
8    * (at your option) any later version.
9    *
10   * COPAL is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   * GNU Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public License
16   * along with COPAL. If not, see <http://www.gnu.org/licenses/>.
17   */
18  package at.ac.tuwien.infosys.sm4all.copal.core.internal;
19  
20  import java.text.MessageFormat;
21  import java.util.HashMap;
22  import java.util.Map;
23  import java.util.concurrent.atomic.AtomicBoolean;
24  import java.util.concurrent.locks.Lock;
25  import java.util.concurrent.locks.ReentrantLock;
26  import org.apache.log4j.Level;
27  import org.apache.log4j.Logger;
28  import at.ac.tuwien.infosys.sm4all.copal.api.ContextException;
29  import at.ac.tuwien.infosys.sm4all.copal.api.event.ContextEvent;
30  import at.ac.tuwien.infosys.sm4all.copal.api.event.CurrentAction;
31  import at.ac.tuwien.infosys.sm4all.copal.api.listener.ContextListener;
32  import at.ac.tuwien.infosys.sm4all.copal.api.osgi.GenericActivator;
33  import at.ac.tuwien.infosys.sm4all.copal.api.service.DeadLetterChannel;
34  import at.ac.tuwien.infosys.sm4all.copal.api.service.PublishingService;
35  import at.ac.tuwien.infosys.sm4all.copal.api.util.ConcreteObservable;
36  
37  /**
38   * Implementation of the {@link DeadLetterChannel} interface.
39   * 
40   * @author sanjin
41   */
42  public class DeadLetterChannelImpl extends GenericActivator implements
43          DeadLetterChannel {
44  
45      private static final Logger LOGGER = Logger.getLogger(DeadLetterChannel.class);
46  
47      private final Map<String, ListenerObserverProxy> proxies = new HashMap<String, ListenerObserverProxy>();
48      private final Lock lock = new ReentrantLock();
49      private final ConcreteObservable<Reason, ContextEvent, ChannelObserver> observable = new ConcreteObservable<Reason, ContextEvent, ChannelObserver>();
50      private final AtomicBoolean started = new AtomicBoolean(false);
51      private PublishingService publishingService;
52  
53      /**
54       * Creates an instance of the {@link DeadLetterChannel} implementation.
55       */
56      public DeadLetterChannelImpl() {
57          super(PublishingService.class.getName());
58      }
59  
60      @Override
61      public void attach(final ContextListener listener) {
62          final String listenerName = listener.getName();
63  
64          this.lock.lock();
65          try {
66              if (this.proxies.containsKey(listenerName)) {
67                  if (LOGGER.isEnabledFor(Level.WARN)) {
68                      LOGGER.warn(MessageFormat.format(
69                              "Listener ''{0}'' already attached! Ignoring.",
70                              listenerName));
71                  }
72              } else {
73                  final ListenerObserverProxy proxy = new ListenerObserverProxy(
74                          listener);
75                  attach(proxy);
76                  this.proxies.put(listenerName, proxy);
77              }
78          } finally {
79              this.lock.unlock();
80          }
81  
82      }
83  
84      @Override
85      public void attach(final ChannelObserver observer) {
86          this.observable.attach(observer);
87      }
88  
89      @Override
90      public void detach(final ContextListener listener) {
91          final String listenerName = listener.getName();
92  
93          this.lock.lock();
94          try {
95              if (this.proxies.containsKey(listenerName)) {
96                  detach(this.proxies.remove(listenerName));
97              } else {
98                  if (LOGGER.isEnabledFor(Level.WARN)) {
99                      LOGGER.warn(MessageFormat.format(
100                             "Listener ''{0}'' is not attached! Ignoring.",
101                             listenerName));
102                 }
103             }
104         } finally {
105             this.lock.unlock();
106         }
107     }
108 
109     @Override
110     public void detach(final ChannelObserver observer) {
111         this.observable.detach(observer);
112     }
113 
114     /**
115      * Returns if this {@link DeadLetterChannel} has been started by an OSGi
116      * framework.
117      * 
118      * @return if this {@link DeadLetterChannel} has been started by an OSGi
119      *         framework.
120      */
121     public boolean isStarted() {
122         return this.started.get();
123     }
124 
125     @Override
126     protected void start() {
127         if (!this.started.getAndSet(true)) {
128             this.publishingService = getDependency(PublishingService.class.getName());
129 
130             if (register(DeadLetterChannel.class, this)) {
131                 if (LOGGER.isInfoEnabled()) {
132                     LOGGER.info("Successfully registered DeadLetterChannel service.");
133                 }
134             } else {
135                 if (LOGGER.isEnabledFor(Level.ERROR)) {
136                     LOGGER.error("Failed to register DeadLetterChannel service!");
137                 }
138             }
139         }
140     }
141 
142     @Override
143     protected void stop() {
144         if (this.started.getAndSet(false)) {
145             if (unregister(DeadLetterChannel.class)) {
146                 if (LOGGER.isInfoEnabled()) {
147                     LOGGER.info("Successfully unregistered DeadLetterChannel service.");
148                 }
149             }
150 
151             this.publishingService = null;
152         }
153     }
154 
155     @Override
156     public String getName() {
157         return "UnmatchedEventProcessor";
158     }
159 
160     @Override
161     public void onEvent(final ContextEvent event) {
162         if (event.getCurrentActionIndex() < event.getNumberOfActions()) {
163             final CurrentAction currentAction = event.getCurrentAction();
164             if (currentAction.isRequired()) {
165                 this.observable.notifyAll(Reason.NoRequiredProcessor, event);
166 
167                 if (LOGGER.isEnabledFor(Level.ERROR)) {
168                     LOGGER.error(MessageFormat.format(
169                             "Dropped event ''{0}''! No processor for the required ''{1}'' action.",
170                             event.getType().getName(), currentAction.getName()));
171                 }
172             } else {
173                 try {
174                     this.publishingService.publish(event);
175                 } catch (final ContextException ex) {
176                     if (LOGGER.isEnabledFor(Level.ERROR)) {
177                         LOGGER.error(
178                                 MessageFormat.format(
179                                         "Could not republish event ''{0}''! Dropping the event.",
180                                         event.getType().getName()), ex);
181                     }
182                 }
183             }
184         } else {
185             this.observable.notifyAll(Reason.NoQueries, event);
186 
187             if (LOGGER.isEnabledFor(Level.WARN)) {
188                 LOGGER.warn(MessageFormat.format(
189                         "Dropped event ''{0}''! No queries.",
190                         event.getType().getName()));
191             }
192         }
193     }
194 
195     private static class ListenerObserverProxy implements ChannelObserver {
196 
197         private final ContextListener listener;
198 
199         /**
200          * Create {@link ListenerObserverProxy} for specified
201          * {@link ContextListener}.
202          * 
203          * @param listener the {@link ContextListener}.
204          */
205         protected ListenerObserverProxy(final ContextListener listener) {
206             super();
207 
208             this.listener = listener;
209         }
210 
211         @Override
212         public void update(final Reason reason, final ContextEvent event) {
213             this.listener.onEvent(event);
214 
215         }
216     }
217 }