Processors Tutorial

In this tutorial we will reuse the code for publisher and listener from Advance Event & Query Configuration tutorial to create two processors on top of it. The first processor will add fahrenheit attribute into ETemperature events and set its value to temperature in fahrenheit by calculating it from celsius. The second processor will add kelvin attribute in the ETemperature event and set its value to temperature in kelvin.

This tutorial will explain:

  • Defining required and optional default actions for an event.
  • Creating a publisher for event action.

The preferred way of completing this tutorial is to reuse your implementation for the Advance Event & Query Configuration tutorial (or use our implementation) to implement functionality of this tutorial. For impatient ones, whole source code of the completed tutorial can be downloaded from here.

Table of content:

Defining Default Actions Top

The TemperatureSensor publishes ETemperature events in celsius and we would also like to have the published temperature in fahrenheit and kelvin. One solution is to change the publisher and calculate these values for each published ETemperature event. The better and more flexible solution is to create two processors which will calculate fahrenheit and kelvin temperature for each published ETemperature event. The publisher is not aware of the processors and processors are not aware of each other; we separate the task of reading the sensor and publishing the event, and computing temperature in fahrenheit and kelvin from celsius in their own implementations. Only the listener will be aware of the consequence of processor action, but not their existence, because it can now read the temperature in celsius, fahrenheit or kelvin.

First task is to define the default actions for ETemperature event. We do this in the publishers.cfg.xml file:

<Context xmlns="http://www.sm4all-project.eu/COPAL">
  <Event name="ETemperature"
    namespace="http://www.sm4all-project.eu/COPAL/Tutorial">
    <Actions>
      <Action name="AddFahrenheit" required="true" />
      <Action name="AddKelvin" required="false" />
    </Actions>
    <Schema>
      <ClassPath location="ETemperature.xsd" />
    </Schema>
  </Event>
</Context>

Difference between the ETemperature event definition in the Advance Event & Query Configuration tutorial and this definition is in the Actions element. This element has one or more Action elements. Each Action defines one action. Each separate event instance actually carries its own actions but in the event definition we can define which actions it will each event instance carry by default. In our case we define two actions: AddFahrenheit and AddKelvin. The AddFahrenheit action is required and the AddKelvin action is optional. If an action is required or optional is set using the required attribute where true means the action is required and false that action is optional.

When we run the example we should not see any output from the TemperatureListener and instead we will get error messages telling us that there is no processor for the required AddFahrenheit action. If the event is missing a processor for the required action, that this event will be dropped by COPAL because it is considered uncompleted. To fix this error we have to create at least one processor that can handle the AddFahrenheit action.

The output of running the example should look something like this:

20:24:46.759 ERROR at.ac.tuwien.infosys.sm4all.copal.core.internal.DeadLetterChannelImpl:153 - Dropped event 'ETemperature'! No processor for the required 'AddFahrenheit' action. 
20:24:51.763 ERROR at.ac.tuwien.infosys.sm4all.copal.core.internal.DeadLetterChannelImpl:153 - Dropped event 'ETemperature'! No processor for the required 'AddFahrenheit' action. 
20:24:56.760 ERROR at.ac.tuwien.infosys.sm4all.copal.core.internal.DeadLetterChannelImpl:153 - Dropped event 'ETemperature'! No processor for the required 'AddFahrenheit' action.
...

Before we go any further we must also extend the XML Schema file for the ETemperature event. We have to define two additional attributes: fahrenheit and kelvin. These attributes will hold the value of the temperature in fahrenheit and in kelvin and the ETemperature.xsd file should now look like:

<?xml version="1.0" encoding="UTF-8"?>
<xsi:schema
  xmlns="http://www.sm4all-project.eu/COPAL/Tutorial"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema"
  targetNamespace="http://www.sm4all-project.eu/COPAL/Tutorial"
  elementFormDefault="qualified">

  <xsi:element name="ETemperature">
    <xsi:complexType>
      <xsi:attribute name="celsius" type="xsi:decimal" use="required" />
      <xsi:attribute name="fahrenheit" type="xsi:decimal" use="optional" />
      <xsi:attribute name="kelvin" type="xsi:decimal" use="optional" />
    </xsi:complexType>
  </xsi:element>
</xsi:schema>

Both elements must be optional, because when an ETemperature event is published it does not have either of them and the validation of the event against this schema file would fail and COPAL would just drop the event and never publish it.

Calculate Fahrenheit Top

A processor is a combination of publisher and listener in one. The difference from publisher is that it has an input event and can publish new events only as a result of processing an action on the input event. It can publish same event modified or not, it can publish new events, or it does not have to publish any event. If it does not publish a new event and this processor is only processor for this action than the input event will be lost. The difference between a processor and listener is that listener does not publish new events and that listener receives events for which at least all required actions are already processed.

The actions are, as we mentioned before, carried by the events. This makes possible to route each event instance differently at runtime. The event definition only tells which actions are in each event instance by default. It does not mean that the action will be executed on an event, because we can change or even remove actions at runtime for each event instance. The final thing worth mentioning is that multiple processors can be registered for same action and all registered processors will be executed if an event instance requires this action.

First step for us is to create a processor for the AddFahrenheit action. We start by creating a new processors subproject (you can copy processors from the generic skeleton project and modify its pom.xml file). The processors can either extend the BaseProcessor class, implement the ContextProcessor interface, or use annotations.The simplest way to implement the AddFahrenheit processor is to use annotations and just relay the input ETemperature event unchanged to output. It does not do what is expected, but for now we just need to see how to define and register a processor with COPAL.

The code for this simple AddFahrenheit processor looks like:

import at.ac.tuwien.infosys.sm4all.copal.api.processor.Action;
import at.ac.tuwien.infosys.sm4all.copal.api.util.Name;

@Name("CelsiusToFahrenheitCalculator")
public class FahrenheitCalculator {

    @Action(name = "AddFahrenheit", input = "ETemperature", output = "ETemperature")
    public XMLEvent calculate(final XMLEvent event) {
        return event;
    }
}

First we specify the name of the processor with the @Name annotation. Same as with the annotated listeners, the processor must have at least one method annotated with the @Action or @Actions annotation.The @Action annotation specifies the name of action the processor can handle, the type of input event and optionally types of output events. In our case, this processor can process the AddFahrenheit action on an ETemperature event and the output can only be an ETemperature event. We can also specify more than one type of output event telling COPAL that output can be any of specified events (i.e. output can be one or more events of any specified output types). We can also omit the output argument which means that processor will never publish any event. If we do not want to produce any output for some specific input, we just return null or an empty array from the process method. The @Actions annotation is just a composition annotation when you need to annotate the method with multiple @Action annotations and it is not used in this example.

The rules for method to be annotated with @Action or @Actions annotation are:

  • the method must be public,
  • the method must have one or two parameter,
  • the required parameter must be ContextEvent or a subclass (in our example it is the XMLEvent),
  • The other (optional) parameter must be of type ProcessorAction (used when the method must know which action it must do on the event),
  • the method must be void or return ContextEvent (or a subclass), an array of ContextEvents (or an array of any ContextEvent's subclass), or a java.util.Collection of ContextEvents (or a collection of ContextEvent's subclass).

The rules, for which methods are invoked when an event is received, are:

  • the event must have class equal to or a subclass of the method parameter,
  • the name of event's current action must be equal to the name of processed action,
  • the type of event must be equal to the input type,
  • if the method is annotated with the @Actions annotation than the event must match any @Action annotation.
  • all methods for which above rules hold will be invoked with the event.

The final step for us is to create the OSGi Activator that will register this processor with COPAL. This activator looks almost identical to the activator for publishers and only difference is that it knows how to register processor and not publishers. Notice that we must wrap the FahrenheitCalculator with the AnnotatedProcessor because it doesn't implement the ContextProcessor interface.

import at.ac.tuwien.infosys.sm4all.copal.api.processor.AnnotatedProcessor;
import at.ac.tuwien.infosys.sm4all.copal.api.processor.ProcessorsActivator;

public class Activator extends ProcessorsActivator {

    @Override
    protected void start() {
        register(new AnnotatedProcessor(new FahrenheitCalculator()));
    }
}

If we run the example, we should receive the expected output from the TemperatureListener telling us the temperature in celsius:

20:47:57.608 ERROR at.ac.tuwien.infosys.sm4all.copal.core.internal.DeadLetterChannelImpl:153 - Dropped event 'ETemperature'! No processor for the required 'AddFahrenheit' action. 
New temperature
Celsius: 21
New temperature
Celsius: 16
...

First thing you should notice is that the TemperatureListener receives the event, even though a AddKelvin processor was never created nor registered, and this is because the AddKelvin action is optional. If COPAL finds an event that needs an action that is optional and there is no processor registered for this action, it will just pass the event unchanged to next action. In our case, there are no further actions and listeners receive the event.

The output also has one error message. The error message is because the OSGi system started the publishers before it started the processors and when the publisher started publishing there was no processor for the AddFahrenheit action. We can see that for second and third ETemperature event the processor was registered and therefore the listener was able to receive the ETemperature event. We cannot do much about this except perhaps tell the OSGi system to start the publishers after the processors using the higher start level for publishers then for the processors.

Now is also good time to extend the TemperatureListener to print out the temperature in fahrenheit and kelvin. The new onEvent(Document) method in the listener now looks like this:

@Event(type = "ETemperature")
public void onTemperature(final XMLEvent event) {
    final Document document = event.getDocument();
    final Element rootElement = document.getDocumentElement();
    final String celsius = rootElement.getAttribute("celsius");
    final String fahrenheit = rootElement.getAttribute("fahrenheit");
    final String kelvin = rootElement.getAttribute("kelvin");

    System.out.println("New temperature");
    System.out.println("Celsius: " + celsius);
    if (fahrenheit != null) {
        System.out.println("Fahrenheit: " + fahrenheit);
    }
    if (kelvin != null) {
        System.out.println("Kelvin: " + kelvin);
    }
}

Notice that we are printing the text content of the fahrenheit and kelvin attributes only if they are present because they are defined as optional in the ETemperature XML Schema file. If we run the example now, we should get only the temperature in celsius, because the AddFahrenheit processor never calculated the temperature in fahrenheit.

Only missing part is to actually calculate the temperature in fahrenheit. For this task we first need to extract the temperature in celsius and use the formula fahrenheit = celsius * 9/5 + 32 to get the temperature in fahrenheit. After that, we add a new attribute with name fahrenheit that has the temperature in fahrenheit as its value.

@Override
@Action(name = "AddFahrenheit", input = "ETemperature", output = "ETemperature")
public XMLEvent calculate(final XMLEvent event) throws MalformedDocumentException {
    final Document document = event.getDocument();
    final Element rootElement = document.getDocumentElement();

    
    final BigDecimal celsius = new BigDecimal(rootElement.getAttribute("celsius")).setScale(1, RoundingMode.UNNECESSARY);
    // fahrenheit = celsius * 9 / 5 + 32
    final BigDecimal fahrenheit = celsius.multiply(BigDecimal.valueOf(9)).divide(BigDecimal.valueOf(5)).add(BigDecimal.valueOf(32));

    rootElement.setAttribute("fahrenheit", fahrenheit.toPlainString());
    return new XMLEvent(event.getType(), document);
}

Finally, the TemperatureListener should print the temperature in fahrenheit beside the temperature in celsius:

New temperature
Celsius: 4
Fahrenheit: 39.2
New temperature
Celsius: -8
Fahrenheit: 17.6
...

Calculate Kelvin Top

A processor should not care if the event action is required or optional. This can be easily understood: some event instances will have this action as required and some not. We already mentioned that it is single event instance that carries the information about the actions that should be processed on it. The processor should only care how to do its processing of events it receives.

The AddKelvin processor looks almost identical to the AddFahrenheit processor and only difference is in name, which action it can process and how to calculate the temperature in kelvin from temperature in celsius:

@Name("CelsiusToKelvinCalculator")
public class KelvinCalculator {

    @Action(name = "AddKelvin", input = "ETemperature", output = "ETemperature")
    public XMLEvent calculate(final XMLEvent event) throws MalformedDocumentException {
        final Document document = event.getDocument();
        final Element rootElement = document.getDocumentElement();
        
        final BigDecimal celsius = new BigDecimal(rootElement.getAttribute("celsius")).setScale(2, RoundingMode.UNNECESSARY);
        // kelvin = celsius + 273.15
        final BigDecimal kelvin = celsius.add(new BigDecimal("273.15"));

        rootElement.setAttribute("kelvin", kelvin.toPlainString());
        return new XMLEvent(event.getType(), document);
    }
}

Next we register the AddKelvin processor in the bundle activator:

@Override
protected void start() {
    register(new AnnotatedProcessor(new FahrenheitCalculator()));
    register(new AnnotatedProcessor(new KelvinCalculator()));
}

And when we run the example the TemperatureListener should print the temperature in celsius, fahrenheit and kelvin.

New temperature
Celsius: -13
Fahrenheit: 8.6
Kelvin: 260.15
New temperature
Celsius: 6
Fahrenheit: 42.8
Kelvin: 279.15
...

This finishes the tutorial that explains how define the default actions for an event and create processor that can process these actions.