Unexpected behaviour in datastream.broadcast()

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Unexpected behaviour in datastream.broadcast()

Biplob Biswas
Hi,

I am running this following sample code to understand how iteration and broadcast works in streaming context.

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                 env.setParallelism(4);
                 long i = 5;
                 DataStream<Long> mainInput = env.generateSequence(2, 8);
                 DataStream<Long> initialIterateInput = env.fromElements(i);
       
       
                IterativeStream.ConnectedIterativeStreams<Long, Long> iteration =
                        mainInput.iterate().withFeedbackType(BasicTypeInfo.LONG_TYPE_INFO);
                 
       
                DataStream<Long> iterateHead = iteration
                        .flatMap(new CoFlatMapFunction<Long, Long, Long>() {
                            long globalVal = 1;
                        @Override
                            public void flatMap1(Long value, Collector<Long> out) throws Exception {
                                Thread.sleep(3000);
                                System.out.println("SEEING FROM INPUT 1: " + value+", "+globalVal);
                                //globalVal = globalVal + value;
                                out.collect(globalVal+value);
                            }
       
                            @Override
                            public void flatMap2(Long value, Collector<Long> out) throws Exception {
                                Thread.sleep(1000);
                                globalVal = value;
                                System.out.println("SEEING FROM INPUT 2: " + value+", "+globalVal);
       
                                //out.collect(value);
       
                            }
                        });
       
                iteration.closeWith(iterateHead.broadcast());
       
                iterateHead.map(new MapFunction<Long, Long>() {
                    @Override
                    public Long map(Long value) throws Exception {
                        System.out.println("SEEING OUTPUT FROM ITERATION: " + value);
                        return value;
                    }
                });

I was expecting that after  out.collect(globalVal+value); is called the value would be broadcasted to every partition as given by the closewith statement. Also, i was expecting to get the broadcasted value to the flatmap2 function and then update the globalval in every partition.
But  rather than that, the values are not broadcasted and iterated properly as i was expecting and i am getting the following output,

SEEING FROM INPUT 1: 2, 1
SEEING OUTPUT FROM ITERATION: 3
SEEING FROM INPUT 1: 3, 1
SEEING OUTPUT FROM ITERATION: 4
SEEING FROM INPUT 1: 4, 1
SEEING FROM INPUT 1: 5, 1
SEEING OUTPUT FROM ITERATION: 5
SEEING OUTPUT FROM ITERATION: 6
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 1: 6, 1
SEEING OUTPUT FROM ITERATION: 7
SEEING FROM INPUT 1: 7, 1
SEEING OUTPUT FROM ITERATION: 8
SEEING FROM INPUT 1: 8, 1
SEEING OUTPUT FROM ITERATION: 9
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 6, 6
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 4, 4
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 5, 5
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 3, 3
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 9, 9
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 8, 8
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7
SEEING FROM INPUT 2: 7, 7


Can anyone please explain why such behaviour? Why is the iteration happening after reading all the elements of the first input stream? what if it is an infinite stream, would the iteration wait for it to finish?

Thanks and Regards
Reply | Threaded
Open this post in threaded view
|

Re: Unexpected behaviour in datastream.broadcast()

Biplob Biswas
Can anyone help me understand how the out.collect() and the corresponding broadcast) is working?
Reply | Threaded
Open this post in threaded view
|

Re: Unexpected behaviour in datastream.broadcast()

Aljoscha Krettek
Hi,
there is no guarantee on the order in which the elements are processed. So it can happen that most elements from input one get processed before elements from the feedback get processed. In case of an infinite first input this will not happen, of of course.

For understanding what's going on it might also be helpful to print the parallel subtask index of the operator where you are printing output. For that you have to use a RichCoFlatMapFunction. In there, you can use getRuntimeContext().getSubtaskIndex() to know which parallel instance an operator is.

Cheers,
Aljoscha 

On Sat, 14 May 2016 at 19:55 Biplob Biswas <[hidden email]> wrote:
Can anyone help me understand how the out.collect() and the corresponding
broadcast) is working?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Unexpected-behaviour-in-datastream-broadcast-tp6848p6925.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.