Maintaining message input order in streams with keyBy/filter/connect

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Maintaining message input order in streams with keyBy/filter/connect

Patrick Fial
*Intro*
I am using apache flink to build a rather complex network of data streams. The idea is, to implement a rule engine with flink.

As a basic description of the application, this is how it is supposed to work:

Data is received by a kafka consumer source, and processed with a number of data streams, until it is finally sent to a kafka producer sink. The incoming data contains objects with a logical key ("object-id"), and the incoming messages may refer to the same object-id. For every given object-id, the order of its incoming messages must be retained throughout the application. The order of overall messages can be arbitrary.

This means, messages a,b and c of object1 must be processed in-order, however message x of object2 might be processed in between a1/b1/c1, before, or after, it does not matter.

For my current understanding this means I must keyBy(_.objectID), so that messages of the same object are processed in the order they arrived.

*Current approach*
To implement the actual rule engine, a network of streams is created. The idea is the following:

- each rule will have 1-n conditions
- for every condition of every rule create a sub-stream of the original stream with .filter(_.matches(rule.condition))
- combine all sub-streams which correspond to the same rule by using substream1.connect(substream2).flatMap(new CombineFunction[WorkingMemory](...))
- connect can only join 2 streams, so a rule with 3 conditions will result in subsequent 2 joins
- rules using the same condition will re-use the same sub-stream created in the second step.

This will result in n joined streams, where n corresponds to the number of rules. The joined streams will have a map function appended to them, which marks the message, so that we know that a rule matched.

Each joined/result stream may publish its result ("rule xyz matched") to the kafka producer independently from the other results, so at this point I can attach the sink to the streams.

*Connect details*
Because the .connect of two streams ("condition"-substreams) must only pass a message, if it was received on both streams (^= both conditions matched), I need a RichCoFlatMapFunction with a keyed state, which can take care of the "pass only if it was received already on the other side".

However, the problem is, that the stream is keyed by object-id. So what happens if 2 messages of the same object run through the network and reach the .connect().map(new RichCoFlatMapFunction...)? It will lead to wrong ouput. I would need to assign each incoming message a unique ID (UUID) upon entering the network, so I can use this key (instead of the object-id) in the .connect().map().. join. But at the same time, I need the stream to be keyed by object-id, so that messages of the same objects are processed in-order. What to do?

To solve this, I kept the input-stream with keyBy(_.objectID), but the RichCoFlatMapFunction in the stream-join no longer uses the keyed-state. Instead, I am using a simple operator state, which keeps a map of passed objects, but implements the same logic, just with manual key/value lookup.

This seems to work, however I don't know if this introduces more issues.

*Visualization*
The flink GUI will render this image, for a list of 14 rules with a total of 23 conditions (some rules only have one condition):


*Code*
The creation of the network is achieved using this code:

```
val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()

if (rules.isEmpty)
  return

// create partial streams for all conditions (first level)
// cache the sub-stream with the hashcode of its condition as key (for re-use)

for (rule <- rules if rule.checks.nonEmpty ;
     cond <- rule.checks if !streamCache.contains(cond.hashCode()))
  streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)

// create joined streams for combined conditions (sub-levels)

for (rule <- rules if rule.checks.nonEmpty)
{
  val ruleName = rule.ruleID

  // for each rule, starting with the rule with the least conditions ...

  if (rule.checks.size == 1)
  {
    // ... create exit node if single-condition rule
    // each exit node applies the rule-name to the objects set of matched rules.

    outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
  }
  else
  {
    // ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)

    var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
    var idString = rule.checks.head.idString

    for (i <- rule.checks.indices)
    {
      if (i == rule.checks.size-1)
      {
        // reached last condition of rule, create exit-node
        // each exit node applies the rule-name to the objects set of matched rules.

        val rn = ruleName
        val objectType = rule.objectType.mkString(":")
        val statement = rule.statement

        outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
      }
      else
      {
        // intermediate condition, create normal intermediate node

        val there = rule.checks(i+1)
        val connectStream = streamCache(there.hashCode)

        idString += (":" + there.idString)

        // try to re-use existing tree-segments

        if (streamCache.contains(idString.hashCode))
          sourceStream = streamCache(idString.hashCode)
        else
          sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
      }
    }
  }
}

// connect each output-node to the sink

for (stream <- outputNodesCache)
{
  stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
}
```

The StatefulCombineFunction used in the previous snippet:

```
class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
{
  @transient
  private var leftState:ListState[(String, WorkingMemory)] = _
  private var rightState:ListState[(String, WorkingMemory)] = _
  private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
  private var bufferedRight = ListBuffer[(String, WorkingMemory)]()

  override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
  override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")

  def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
  {
    val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)

    if (otherIdx > -1)
    {
      out.collect(leftState(otherIdx)._2)
      leftState.remove(otherIdx)
    }
    else
    {
      rightState += ((xmlObject.uuid, xmlObject))
    }
  }

  override def initializeState(context:FunctionInitializationContext): Unit = ???
  override def snapshotState(context:FunctionSnapshotContext):Unit = ???
}
```

*Background information*
This application shall implement the rete-algorithm for rule matching using flink (https://en.wikipedia.org/wiki/Rete_algorithm).

A different approach would be to just loop all rules for every incoming message, and attach the result. I have a working implementation for this approach using flink, so please don't advise this as a solution.

*Issues*
The problem is, that the application messes up the order of incoming messages on the object-id level. That is, it does not achieve what I required in the intro. For each object-id, the incoming messages must keep the order. But this is not the case.

I don't know at which point in code the order gets messed up, or how those operations are distributed amongst threads, so I don't know how to solve this issue.

Best regards
Patrick Fial