Blog Posts BPMN DMN

Drools Reactive Messaging processing

Blog: Drools & jBPM Blog

The latest Drools 8.31.0.Final comes with a Reactive Messaging example, which demonstrates reactively consuming messages from a Kafka topic, firing rules and then sending result messages to another Kafka topic. You can find it in https://github.com/kiegroup/drools/tree/main/drools-drl-quarkus-extension/drools-drl-quarkus-examples/drools-drl-quarkus-examples-reactive. This blog post explains how it works.

How To Run the Example

Clone drools repository

$ git clone https://github.com/kiegroup/drools.git

Go to the example directory

$ cd drools-drl-quarkus-extension/drools-drl-quarkus-examples/drools-drl-quarkus-examples-reactive/

docker-compose.yml is provided for a quick test with Kafka and Kafdrop.

$ docker-compose up -d

Build and start the application with dev mode.

$ mvn clean compile quarkus:dev

Open another terminal and send a message to a Kafka topic events

$ echo '{"type":"temperature","value":35}' | kafka-console-producer.sh --broker-list localhost:9092 --topic events

You will see STDOUT log in the terminal where the application is running. It means the message is consumed and the rule is fired.

rule IncomingEvent fired : Event [type=temperature, value=35]

If you access Kafdrop http://localhost:9000 with a browser, which is already started by docker-compose, you will find alerts topic. You can confirm the result message sent by the rule.

{"severity":"warning","message":"Event [type=temperature, value=35]"}

To shutdown the app, press Ctrl+C on the terminal.

To shutdown Kafka and Kafdrop,

$ docker-compose down

How it works

Here is the diagram of this example architecture.

Reactive Messaging example architecture

The important part is how to connect Drools DataSources with Kafka topics. Thanks to Quarkus’ reactive messaging support, we can achieve it with a very small amount of codes.

In order to get reactive messaging support with Kafka, you just need to have a dependency quarkus-smallrye-reactive-messaging-kafka.

    <dependency>
      <groupId>io.quarkus</groupId>
      <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
    </dependency>

Take a look at Adaptor class.

@Startup
@ApplicationScoped
public class Adaptor {

    @Inject
    RuleUnit<AlertingUnit> ruleUnit;

    AlertingUnit alertingUnit;
    RuleUnitInstance<AlertingUnit> ruleUnitInstance;

    @Inject
    @Channel("alerts")
    Emitter<Alert> emitter;

    @PostConstruct
    void init() {
        this.alertingUnit = new AlertingUnit();
        this.ruleUnitInstance = ruleUnit.createInstance(alertingUnit);
        alertingUnit.getAlertData().subscribe(DataObserver.of(emitter::send));
    }

    @Incoming("events")
    public void receive(Event event) throws InterruptedException {
        alertingUnit.getEventData().append(event);
        ruleUnitInstance.fire();
    }
}

With @Incoming("events"), you can receive Event object from Kafka topic events. This association is configured in application.properties.

mp.messaging.incoming.events.connector=smallrye-kafka
mp.messaging.incoming.events.topic=events
mp.messaging.incoming.events.value.deserializer=org.drools.quarkus.ruleunit.examples.reactive.EventDeserializer

mp.messaging.outgoing.alerts.connector=smallrye-kafka
mp.messaging.outgoing.alerts.topic=alerts
mp.messaging.outgoing.alerts.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer

When "IncomingEvent" rule is fired, an Alert object is appended to DataStream alertData.

rule IncomingEvent
  when
    $e : /eventData [ type == "temperature", value >= 30 ]
  then
    System.out.println("rule IncomingEvent fired : "+ $e);
    Alert alert = new Alert( "warning", $e.toString() );
    alertData.append( alert );
end

As you see in Adaptor.init(), the Alert object will be sent to Kafka topic alerts.

Now you can develop a rule service which consumes messages reactively in a micro service environment.

Going forward

We plan to develop Specialized DataSources to connect out-of-the-box Drools rule units with external frameworks and tools, e.g. reads/writes from a Kafka topic even without a glue code like the Adaptor class. Stay tuned!

The post Drools Reactive Messaging processing appeared first on KIE Community.

Leave a Comment

Get the BPI Web Feed

Using the HTML code below, you can display this Business Process Incubator page content with the current filter and sorting inside your web site for FREE.

Copy/Paste this code in your website html code:

<iframe src="https://www.businessprocessincubator.com/content/drools-reactive-messaging-processing/?feed=html" frameborder="0" scrolling="auto" width="100%" height="700">

Customizing your BPI Web Feed

You can click on the Get the BPI Web Feed link on any of our page to create the best possible feed for your site. Here are a few tips to customize your BPI Web Feed.

Customizing the Content Filter
On any page, you can add filter criteria using the MORE FILTERS interface:

Customizing the Content Filter

Customizing the Content Sorting
Clicking on the sorting options will also change the way your BPI Web Feed will be ordered on your site:

Get the BPI Web Feed

Some integration examples

BPMN.org

XPDL.org

×