I have a ConnectedStream (A) that depends on another ConnectedStream (B), which depends on the first one (A). Simplified code: predictionStream = input .connect(statsStream) .keyBy(...) .flatMap(CoFlatMapFunction { flatMap1(obj, output) { p = prediction(obj) output.collect(p) } flatMap2(stat, output) { updateModel(stat) } }) statsStream = input2 .connect(predictionStream) .keyBy(...) .flatMap(CoFlatMapFunction { flatMap1(obj2, output) { s = getStats(obj2, p) output.collect(s) } flatMap2(prediction, output) { p = prediction } }) I'm guessing this should be possible to achieve, one way would be to add a sink on statsStream to save the elements into Kafka and read from that topic on predictionStream instead of initializing it with a reference of statsStream. But I would rather avoid writing unnecessarily into kafka. Is there any other way to achieve this? Thanks, Matt |
Hello,
Cyclic dataflows can be built using iterations: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html#iterations Best, Gábor 2017-01-28 18:39 GMT+01:00 Matt <[hidden email]>: > I have a ConnectedStream (A) that depends on another ConnectedStream (B), > which depends on the first one (A). > > Simplified code: > > predictionStream = input > .connect(statsStream) > .keyBy(...) > .flatMap(CoFlatMapFunction { > flatMap1(obj, output) { > p = prediction(obj) > output.collect(p) > } > flatMap2(stat, output) { > updateModel(stat) > } > }) > > statsStream = input2 > .connect(predictionStream) > .keyBy(...) > .flatMap(CoFlatMapFunction { > flatMap1(obj2, output) { > s = getStats(obj2, p) > output.collect(s) > } > flatMap2(prediction, output) { > p = prediction > } > }) > > I'm guessing this should be possible to achieve, one way would be to add a > sink on statsStream to save the elements into Kafka and read from that topic > on predictionStream instead of initializing it with a reference of > statsStream. But I would rather avoid writing unnecessarily into kafka. > > Is there any other way to achieve this? > > Thanks, > Matt |
I'm aware of IterativeStream but I don't think it's useful in this case. As shown in the example above, my use case is "cyclic" in that the same object goes from Input to predictionStream (flatMap1), then to statsStream (flatMap2, where it's updated with an object from Input2) and finally to predictionStream (flatMap2). The same operator is never applied twice to the object, thus I would say this dataflow is cyclic only in the dependencies of the stream (predictionStream depends on statsStream, but it depends on predictionStream in the first place). I hope it is clear now. Matt On Sat, Jan 28, 2017 at 3:17 PM, Gábor Gévay <[hidden email]> wrote: Hello, |
Check this image for clarification, this is what I'm trying to do: http://i.imgur.com/ The rectangles are the two CoFlatMapFunction, sharing a state between process and update (map1 and map2). It's clear from the image that I need input1 and the green box to create the blue box, and input2 and the blue box to create the green one. --- blue = input1.connect(green).keyBy(...).flatMap(...); green = input2.connect(blue).keyBy(...).flatMap(...); --- As you can see there's no cycle in the flow of data so I guess this topology is valid. The problem is not having a way to define such flow. For instance, with the appropriate setters we would be able to do this: --- blue = input1.connect(); green = input2.connect(); blue.setConnection(green); green.setConnection(blue); blue.keyBy(...).flatMap(...); green.keyBy(...).flatMap(...); --- Any idea is welcome. Matt On Sat, Jan 28, 2017 at 5:31 PM, Matt <[hidden email]> wrote:
|
I somehow still suspect that iterations might work for your use case. Note, that in the streaming API, iterations are currently nothing more than a back-edge in the topology, i.e. a low-level tool to create a cyclic topology, like as you say with your hypothetical setter syntax. (It's quite different from the iterations of the batch API.) The tricky part for your use-case is that you would want a ConnectedStream as your iteration head, which should get the elements from the back-edge in a separated way from the normal input. You could simulate this by using not ConnectedStream.flatMap, but a just a simple Stream.flatMap whose input element type is an Either type, whose two components would be the normal input and the back-edge input. (And you add maps before the closeWith and to your input1, which would appropriately wrap into the two alternatives of the Either type.) Best, Gábor 2017-01-29 15:39 GMT+01:00 Matt <[hidden email]>:
|
I really don't know what you mean, I've been reading the documentation and examples showing iterations. but it just won't work for me I believe. Maybe you can write a quick example? It doesn't matter the details, only the topology. If anyone else has an idea it's very welcome! Matt On Tue, Jan 31, 2017 at 3:07 PM, Gábor Gévay <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |