*Intro*
I am using apache flink to build a rather complex network of data streams. The idea is, to implement a rule engine with flink.
As a basic description of the application, this is how it is supposed to work:
Data is received by a kafka consumer source, and processed with a number of data streams, until it is finally sent to a kafka producer sink. The incoming data contains objects with a logical key ("object-id"), and the incoming messages
may refer to the same object-id. For every given object-id, the order of its incoming messages must be retained throughout the application. The order of overall messages can be arbitrary.
This means, messages a,b and c of object1 must be processed in-order, however message x of object2 might be processed in between a1/b1/c1, before, or after, it does not matter.
For my current understanding this means I must keyBy(_.objectID), so that messages of the same object are processed in the order they arrived.
*Current approach*
To implement the actual rule engine, a network of streams is created. The idea is the following:
- each rule will have 1-n conditions
- for every condition of every rule create a sub-stream of the original stream with .filter(_.matches(rule.condition))
- combine all sub-streams which correspond to the same rule by using substream1.connect(substream2).flatMap(new CombineFunction[WorkingMemory](...))
- connect can only join 2 streams, so a rule with 3 conditions will result in subsequent 2 joins
- rules using the same condition will re-use the same sub-stream created in the second step.
This will result in n joined streams, where n corresponds to the number of rules. The joined streams will have a map function appended to them, which marks the message, so that we know that a rule matched.
Each joined/result stream may publish its result ("rule xyz matched") to the kafka producer independently from the other results, so at this point I can attach the sink to the streams.
*Connect details*
Because the .connect of two streams ("condition"-substreams) must only pass a message, if it was received on both streams (^= both conditions matched), I need a RichCoFlatMapFunction with a keyed state, which can take care of the "pass
only if it was received already on the other side".
However, the problem is, that the stream is keyed by object-id. So what happens if 2 messages of the same object run through the network and reach the .connect().map(new RichCoFlatMapFunction...)? It will lead to wrong ouput. I would
need to assign each incoming message a unique ID (UUID) upon entering the network, so I can use this key (instead of the object-id) in the .connect().map().. join. But at the same time, I need the stream to be keyed by object-id, so that messages of the same
objects are processed in-order. What to do?
To solve this, I kept the input-stream with keyBy(_.objectID), but the RichCoFlatMapFunction in the stream-join no longer uses the keyed-state. Instead, I am using a simple operator state, which keeps a map of passed objects, but implements
the same logic, just with manual key/value lookup.
This seems to work, however I don't know if this introduces more issues.
*Visualization*
The flink GUI will render this image, for a list of 14 rules with a total of 23 conditions (some rules only have one condition):
*Code*
The creation of the network is achieved using this code:
```
val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()
if (rules.isEmpty)
return
// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)
for (rule <- rules if rule.checks.nonEmpty ;
cond <- rule.checks if !streamCache.contains(cond.hashCode()))
streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)
// create joined streams for combined conditions (sub-levels)
for (rule <- rules if rule.checks.nonEmpty)
{
val ruleName = rule.ruleID
// for each rule, starting with the rule with the least conditions ...
if (rule.checks.size == 1)
{
// ... create exit node if single-condition rule
// each exit node applies the rule-name to the objects set of matched rules.
outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
}
else
{
// ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)
var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
var idString = rule.checks.head.idString
for (i <- rule.checks.indices)
{
if (i == rule.checks.size-1)
{
// reached last condition of rule, create exit-node
// each exit node applies the rule-name to the objects set of matched rules.
val rn = ruleName
val objectType = rule.objectType.mkString(":")
val statement = rule.statement
outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
}
else
{
// intermediate condition, create normal intermediate node
val there = rule.checks(i+1)
val connectStream = streamCache(there.hashCode)
idString += (":" + there.idString)
// try to re-use existing tree-segments
if (streamCache.contains(idString.hashCode))
sourceStream = streamCache(idString.hashCode)
else
sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
}
}
}
}
// connect each output-node to the sink
for (stream <- outputNodesCache)
{
stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
}
```
The StatefulCombineFunction used in the previous snippet:
```
class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
{
@transient
private var leftState:ListState[(String, WorkingMemory)] = _
private var rightState:ListState[(String, WorkingMemory)] = _
private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
private var bufferedRight = ListBuffer[(String, WorkingMemory)]()
override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")
def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
{
val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)
if (otherIdx > -1)
{
out.collect(leftState(otherIdx)._2)
leftState.remove(otherIdx)
}
else
{
rightState += ((xmlObject.uuid, xmlObject))
}
}
override def initializeState(context:FunctionInitializationContext): Unit = ???
override def snapshotState(context:FunctionSnapshotContext):Unit = ???
}
```
*Background information*
This application shall implement the rete-algorithm for rule matching using flink (https://en.wikipedia.org/wiki/Rete_algorithm).
A different approach would be to just loop all rules for every incoming message, and attach the result. I have a working implementation for this approach using flink, so please don't advise this as a solution.
*Issues*
The problem is, that the application messes up the order of incoming messages on the object-id level. That is, it does not achieve what I required in the intro. For each object-id, the incoming messages must keep the order. But this
is not the case.
I don't know at which point in code the order gets messed up, or how those operations are distributed amongst threads, so I don't know how to solve this issue.
Best regards
Patrick Fial
|
Free forum by Nabble | Edit this page |