Hi,
I am running this following sample code to understand how iteration and broadcast works in streaming context. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); long i = 5; DataStream<Long> mainInput = env.generateSequence(2, 8); DataStream<Long> initialIterateInput = env.fromElements(i); IterativeStream.ConnectedIterativeStreams<Long, Long> iteration = mainInput.iterate().withFeedbackType(BasicTypeInfo.LONG_TYPE_INFO); DataStream<Long> iterateHead = iteration .flatMap(new CoFlatMapFunction<Long, Long, Long>() { long globalVal = 1; @Override public void flatMap1(Long value, Collector<Long> out) throws Exception { Thread.sleep(3000); System.out.println("SEEING FROM INPUT 1: " + value+", "+globalVal); //globalVal = globalVal + value; out.collect(globalVal+value); } @Override public void flatMap2(Long value, Collector<Long> out) throws Exception { Thread.sleep(1000); globalVal = value; System.out.println("SEEING FROM INPUT 2: " + value+", "+globalVal); //out.collect(value); } }); iteration.closeWith(iterateHead.broadcast()); iterateHead.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("SEEING OUTPUT FROM ITERATION: " + value); return value; } }); I was expecting that after out.collect(globalVal+value); is called the value would be broadcasted to every partition as given by the closewith statement. Also, i was expecting to get the broadcasted value to the flatmap2 function and then update the globalval in every partition. But rather than that, the values are not broadcasted and iterated properly as i was expecting and i am getting the following output, SEEING FROM INPUT 1: 2, 1 SEEING OUTPUT FROM ITERATION: 3 SEEING FROM INPUT 1: 3, 1 SEEING OUTPUT FROM ITERATION: 4 SEEING FROM INPUT 1: 4, 1 SEEING FROM INPUT 1: 5, 1 SEEING OUTPUT FROM ITERATION: 5 SEEING OUTPUT FROM ITERATION: 6 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 1: 6, 1 SEEING OUTPUT FROM ITERATION: 7 SEEING FROM INPUT 1: 7, 1 SEEING OUTPUT FROM ITERATION: 8 SEEING FROM INPUT 1: 8, 1 SEEING OUTPUT FROM ITERATION: 9 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 2: 6, 6 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 4, 4 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 5, 5 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 3, 3 SEEING FROM INPUT 2: 7, 7 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 9, 9 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 8, 8 SEEING FROM INPUT 2: 7, 7 SEEING FROM INPUT 2: 7, 7 SEEING FROM INPUT 2: 7, 7 Can anyone please explain why such behaviour? Why is the iteration happening after reading all the elements of the first input stream? what if it is an infinite stream, would the iteration wait for it to finish? Thanks and Regards |
Can anyone help me understand how the out.collect() and the corresponding broadcast) is working?
|
Hi, there is no guarantee on the order in which the elements are processed. So it can happen that most elements from input one get processed before elements from the feedback get processed. In case of an infinite first input this will not happen, of of course. For understanding what's going on it might also be helpful to print the parallel subtask index of the operator where you are printing output. For that you have to use a RichCoFlatMapFunction. In there, you can use getRuntimeContext().getSubtaskIndex() to know which parallel instance an operator is. Cheers, Aljoscha On Sat, 14 May 2016 at 19:55 Biplob Biswas <[hidden email]> wrote: Can anyone help me understand how the out.collect() and the corresponding |
Free forum by Nabble | Edit this page |