Blog Posts BPMN DMN

Kogito Serverless Workflow event formats

Blog: Drools & jBPM Blog

The Serverless workflow specification relies on CloudEvents for event publishing and consumption.

CloudEvents are designed in a way that might work with any event format. That goal is achieved by declaring the data property, the one containing the event information, as an array of bytes. 

Kogito Serveless workflow expects that incoming and outgoing events represent a CloudEvent and that its data property content is convertible to a JSON object. The process of converting the CloudEvent to an object that can be understood by an external event broker is called marshaling. The inverse procedure, the one that converts the external event broker object into a CloudEvent is called unmarshaling. They are usually, but not always, coupled and I will refer to both of them as (un)marshal procedure for brevity. 

When Kogito is running on the Quarkus platform, integration with external event brokers is performed through Smallrye connectors, as described in this post

The main abstraction provided by a Smallrye connector is the channel. By default, Kogito assumes that all channels within a Workflow application use the same logic for marshaling and unmarshaling. However, in complex applications, the event format used by a channel might be different from the one used by other channels within the same application, therefore Kogito provides means to specify which (un)marshal procedure should be used for each channel. 

This post describes (un)marshal procedures provided by Kogito Serverless Workflow out of the box and focuses on how to set up a workflow application to use them, either globally or channel specific. It also discusses how to add new (un)marshal procedures to Kogito Serverless Workflows programmatically, if the included ones are not suitable. The only thing you will need is basic Java knowledge and some familiarity with J2EE CDI functionality, specifically the Produces annotation. 

Application (un)marshaler

As mentioned previously, if nothing is configured, Kogito Serverless workflow assumes all channels within the same application utilize the same (un)marshall procedure, based on Jackson JSON library. This means that Jackson parser should be able to convert the Smallrye message payload into a JSON object without errors and vice versa. 

Let’s assume you want your application to use a different event format. We are going to describe how to do that for Avro, taking advantage of the fact that Kogito Serverless provides out of the box an (un)marshaler based on the CloudEvent specification avro schema . However, you should be aware that the same procedure can be used to configure  any  other (un)marshaler. 

There are the steps you need to follow:

  1. Add kogito-marshallers-avro dependency to your pom.xml
<dependency>
  <groupId>org.kie.kogito</groupId>
  <artifactId>kogito-addons-quarkus-marshallers-avro</artifactId>
</dependency>
  1. Define, under /src/main/java, a bean factory class that creates the desired bean definitions for CloudEventMarshaller and CloudEventUnmarshallerFactory interfaces. In the class below, we are using the implementations provided by Kogito marshaller addon included as dependency in the previous step. 
@ApplicationScoped
public class ApplicationMarshallerProducer {

    private AvroIO avroIO;

    @PostConstruct
    void init() throws IOException {
            avroIO = new AvroIO();
    }

    @Produces
    public CloudEventUnmarshallerFactory<byte[]> getAvroCloudEventUnmarshallerFactory() {
            return new AvroCloudEventUnmarshallerFactory(avroIO);
    }

    @Produces
    public CloudEventMarshaller<byte[]> getAvroCloudEventMarshaller() {
              return new AvroCloudEventMarshaller(avroIO);
    }
}

The previous setup assumes that all messages have as payload an array of bytes. When using Kafka, this is achieved by using the proper serializer and deserializer. In order to do that, you should  set these properties for every channel:

mp.messaging.outgoing.<channelName>.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
mp.messaging.outgoing.<channelName>.value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

You might be wondering why you need to add a Java class to set up the global marshaller. This is intentional to allow flexibility. For example, you might want your application consumes Avro event format and republish them in Json event format. To do that, you just need to set up the marshaller to be the Jackson one and unmarshaller to be the Avro one, using a Java class as below. 

@ApplicationScoped
public class ApplicationMarshallerProducer {
   
    @Inject
    ObjectMapper objectMapper;

    private AvroIO avroIO;   

    @PostConstruct
    void init() throws IOException {
            avroIO = new AvroIO();
    }

    @Produces
    public CloudEventUnmarshallerFactory<byte[]> getAvroCloudEventUnmarshallerFactory() {
            return new AvroCloudEventUnmarshallerFactory(avroIO);
    }

    @Produces
    public CloudEventMarshaller<byte[]> getJacksonCloudEventMarshaller()      {
              return new ByteArrayEventMarshaller(objectMapper);
    }
}

Per channel (un)marshaler

You have learned how to set up the application level (un)marshaler procedure, but what happens if your application defines several incoming channels, one of them is expecting events to arrive in avro format, and the other one expects json format? The answer is pretty easy, since the default (un)marshaller procedure is based on Jackson, you just need to configure the channel that consumes Avro events to use the Avro unmarshaller. 

In order to do that, you need to perform the following steps:

  1. Add kogito-marshallers-avro dependency to your pom.xml
<dependency>
  <groupId>org.kie.kogito</groupId>
  <artifactId>kogito-addons-quarkus-marshallers-avro</artifactId>
</dependency>
  1. Define a named bean for the CloudEventUnmarshallerFactory interface, annotated with ChannelFormat annotation. In the class below,  the bean name is “avro” and the implementation is the one provided by the Kogito marshaller addon. 
@ApplicationScoped
public class AvroMarshallerProducer {

    private AvroIO avroIO;

    @PostConstruct
    void init() throws IOException {
        avroIO = new AvroIO();
    }

    @Produces
    @Named(“avro”)
    @ChannelFormat
    public CloudEventUnmarshallerFactory<byte[]> getAvroCloudEventUnmarshallerFactory() {
        return new AvroCloudEventUnmarshallerFactory(avroIO);
    }
}

  1. Add a property that establishes the mapping between the channel and the bean name.

The property formats are:

Notice that you can map several channels to the same bean. For example, if the channel name is applicants, since your channel is incoming, you need to add this line to application.properties

kogito.addon.messaging.unmarshaller.applicants=avro

If your application had two incoming channels using avro, you would need to add:

kogito.addon.messaging.unmarshaller.newApplicants=avro
kogito.addon.messaging.unmarshaller.legacyAppicant=avro

You can  find a serverless workflow application using Avro and Json for incoming channels in the Kogito examples repository

Adding custom marshallers

You already know how to set up application and channel level (un)marshaller procedures using predefined Kogito ones: Jackson and Avro, but what happens if your channels use a different format? In that case, you need to provide your own implementation of the (un)marshaller procedure. 

You probably are already aware that setting up a custom (un)marshaller procedure is equivalent to using a predefined one. The difference is that rather than including an existing Kogito addon as dependency in your pom and utilize the classes defined there to produce the required CDI beans, you need to develop your own classes and use them as CDI beans instead. Therefore this section explains which Kogito interfaces need to be implemented to do so. It is assumed that you are fluent in Java. 

There are three interfaces to implement, CloudEventMarshaller, CloudEventUnmarshaller and CloudEventUnmarshallerFactory. You can use the Avro implementation as a reference to follow the explanation in the paragraphs below. 

Unmarshaller implementation

CloudEventUnmarshallerFactory is responsible for creating CloudEventUnmarshaller instances suitable for the provided class parameter, which in the case of Serverless Workflow is always JsonNode (remember that Kogito is intended to work also with BPMN, which uses POJOs)

Therefore, let’s focus on CloudEventUnmarshaller, which is responsible for converting the message payload into a CloudEvent and its data property into a JsonNode. 

public interface CloudEventUnmarshaller<I, O> {

    /**
    * Create Cloud Event from structure event payload
    *
    * @return Cloud Event
    */
    Converter<I, CloudEvent> cloudEvent();

    /**
    * Create Cloud Event from binary event payload
    *
    * @return Cloud Event Data
    */
    Converter<I, CloudEventData> binaryCloudEvent();

    /**
    * Creates Kogito business object from Cloud Event data
    *
    * @return Kogito Businnes Object
    */
    Converter<CloudEventData, O> data();
}

Let’s start first with the generic type I, which represents the possible message payloads. Currently, there are three of them: String, byte[] and Object. Note that Avro (un)marshaller procedure assumes that it is a byte[].

Generic type O is the target object type, which, as mentioned, is always JsonNode for Serverless Workflow. 

Finally, there is the Converter interface, which is a checked version of Function. This interface is responsible for converting from source type to target type and throws an IOException if there is any conversion issue. An unmarshaller implementation should provide three converters:

  1. cloudEvent method converter is used when the CloudEvent is delivered as structured mode message 
  2. binaryCloudEvent method converter is used when the CloudEvent is delivered as binary mode message
  3. data method converter is used regardless of the mode message to convert CloudEventData into the target object type. 

Marshaller implementation 

CloudEventMarshaller interface is responsible for transforming the CloudEvent into a message payload. Note that Kogito assumes structure mode for publishing. 

public interface CloudEventMarshaller<R> {
    /**
    * Convert cloud event into the type expected by the external service
    *
    * @param event Cloud event to be converted
    * @return object to be sent to the external service
    * @throws IOException if there is a conversion problem. This method must NOT report event formatting issues through a runtime exception, it must use IOException. This way the caller
    *         can differentiate between unexpected issues and event formatting ones.
    */
    R marshall(CloudEvent event) throws IOException;

    /**
    * Convert Kogito business object into a CloudEventData for marshaling
    *
    * @param <T> the Kogito business object type
    * @return A CloudEventData that will be marshaled.
    */
    <T> Function<T, CloudEventData> cloudEventDataFactory();
}

GenericType R represents the target message payload type. Possible types are byte[], String and Object

There are two methods to implement: 

  1. Marshall methods convert the CloudEvent into the target message payload. 
  2. cloudEventDataFactory is used internally by Kogito when a CloudEvent is built, for publishing  purposes, to fill its data property. It  converts the Kogito business object (JsonNode in the case of a Serverless Workflow), represented by generic type T, into a CloudEventData instance. This method belongs to the marshaller interface because the CloudEventData implementation to be part of the CloudEvent usually depends on the marshaling procedure. 

The post Kogito Serverless Workflow event formats appeared first on KIE Community.

Leave a Comment

Get the BPI Web Feed

Using the HTML code below, you can display this Business Process Incubator page content with the current filter and sorting inside your web site for FREE.

Copy/Paste this code in your website html code:

<iframe src="https://www.businessprocessincubator.com/content/kogito-serverless-workflow-event-formats/?feed=html" frameborder="0" scrolling="auto" width="100%" height="700">

Customizing your BPI Web Feed

You can click on the Get the BPI Web Feed link on any of our page to create the best possible feed for your site. Here are a few tips to customize your BPI Web Feed.

Customizing the Content Filter
On any page, you can add filter criteria using the MORE FILTERS interface:

Customizing the Content Filter

Customizing the Content Sorting
Clicking on the sorting options will also change the way your BPI Web Feed will be ordered on your site:

Get the BPI Web Feed

Some integration examples

BPMN.org

XPDL.org

×