Weird behavior with CoFlatMapFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Weird behavior with CoFlatMapFunction

andy
Hi guys,

I want to merge 2 diffrent stream, one is config stream and the other is the value json, to check again that config. Its seem like the CoFlatMapFunction should be used.
Here my sample:

    val filterStream: ConnectedStreams[ControlEvent, JsValue]=(specificControlStream).connect(eventStream)
    class FilterFunction() extends CoFlatMapFunction[ControlEvent, JsValue, FilteredEvent] {
      var configs = new ControlEvent(1, "a”) # default
      PPLogger.getActivityLogger.info("# init ")
      override def flatMap1(value: ControlEvent, out: Collector[FilteredEvent]): Unit = {
        PPLogger.getActivityLogger.info("# f1 value %s ".format(value.jsonPath))
        configs =  value
        PPLogger.getActivityLogger.info("# f1 current config %s ".format(configs))
      }
      override def flatMap2(value: JsValue, out: Collector[FilteredEvent]): Unit = {
          PPLogger.getActivityLogger.info("# f2 current config %s ".format(configs))
          PPLogger.getActivityLogger.info("# f2 current value %s ".format(value.toString()))
      }
    }
    val x = new FilterFunction()
    filterStream.flatMap(x)
….


How I sent message (kafka)
——————
+send eventStream msg
+send configStream msg
+send eventStream msg
——————

My log result look like this
2019-06-06 10:15:21 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:21 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}
2019-06-06 10:15:24 INFO  activity                                                     %PARSER_ERROR[x] - # f1 value zzzzz_xxxxxx_2
2019-06-06 10:15:24 INFO  activity                                                     %PARSER_ERROR[x] - # f1 current config ControlEvent(1,zzzzz_xxxxxx_2)
2019-06-06 10:15:30 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:30 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}


My understanding is:
when flatmap1 is run, the config will be changed, and this change will share state (configs is updated) with flatmap2.
But the result log is really different

-----------
I tried use configs as mutable.ListBuffer to collect the history then tried to update configs in both flatmap1 and flatmap2, I see that those two configs variable from flatmap1 and flatmap2 is 2 different variable (but same name in the class!).


My env:
- Flink minicluster (sbt run)
- Flink 1.7.2
- Kafka 1.0
- Scala 2.11



Reply | Threaded
Open this post in threaded view
|

Re: Weird behavior with CoFlatMapFunction

Fabian Hueske-2
Hi,

There are a few things to point out about your example:

1. The the CoFlatMapFunction is probably executed in parallel. The configuration is only applied to one of the parallel function instances. You probably want to broadcast the configuration changes to all function instances. Have a look at the broadcast state pattern [1] [2].
2. Flink does not guaratee the order in which the flatmap1 and flatmap2 methods are called.
3. You probably want to store the configuration in Flink managed state that can be recovered in case of a failure. Again, have a look at the broadcast state pattern.

Best, Fabian


Am Do., 6. Juni 2019 um 12:29 Uhr schrieb Andy Hoang <[hidden email]>:
Hi guys,

I want to merge 2 diffrent stream, one is config stream and the other is the value json, to check again that config. Its seem like the CoFlatMapFunction should be used.
Here my sample:

    val filterStream: ConnectedStreams[ControlEvent, JsValue]=(specificControlStream).connect(eventStream)
    class FilterFunction() extends CoFlatMapFunction[ControlEvent, JsValue, FilteredEvent] {
      var configs = new ControlEvent(1, "a”) # default
      PPLogger.getActivityLogger.info("# init ")
      override def flatMap1(value: ControlEvent, out: Collector[FilteredEvent]): Unit = {
        PPLogger.getActivityLogger.info("# f1 value %s ".format(value.jsonPath))
        configs =  value
        PPLogger.getActivityLogger.info("# f1 current config %s ".format(configs))
      }
      override def flatMap2(value: JsValue, out: Collector[FilteredEvent]): Unit = {
          PPLogger.getActivityLogger.info("# f2 current config %s ".format(configs))
          PPLogger.getActivityLogger.info("# f2 current value %s ".format(value.toString()))
      }
    }
    val x = new FilterFunction()
    filterStream.flatMap(x)
….


How I sent message (kafka)
——————
+send eventStream msg
+send configStream msg
+send eventStream msg
——————

My log result look like this
2019-06-06 10:15:21 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:21 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}
2019-06-06 10:15:24 INFO  activity                                                     %PARSER_ERROR[x] - # f1 value zzzzz_xxxxxx_2
2019-06-06 10:15:24 INFO  activity                                                     %PARSER_ERROR[x] - # f1 current config ControlEvent(1,zzzzz_xxxxxx_2)
2019-06-06 10:15:30 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current config ControlEvent(1,a)
2019-06-06 10:15:30 INFO  activity                                                     %PARSER_ERROR[x] - # f2 current value {"organization_id":1,"randomeText":"text_random35","value":{"to_compare":"zzzzz_xxxxxx_4"}}


My understanding is:
when flatmap1 is run, the config will be changed, and this change will share state (configs is updated) with flatmap2.
But the result log is really different

-----------
I tried use configs as mutable.ListBuffer to collect the history then tried to update configs in both flatmap1 and flatmap2, I see that those two configs variable from flatmap1 and flatmap2 is 2 different variable (but same name in the class!).


My env:
- Flink minicluster (sbt run)
- Flink 1.7.2
- Kafka 1.0
- Scala 2.11