Comment in source code of CoGroupedStreams

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Comment in source code of CoGroupedStreams

sudranga
Is this comment in the file
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
accurate?

" * <p>Note: Right now, the groups are being built in memory so you need to
ensure that they don't
 * get too big. Otherwise the JVM might crash."

Looking at the source code of CoGroupedStreams, i see that it simply does a
map, union and then the data is assigned to appropriate windows. I assumed
that the persistence of elements in the window itself is done using my
configured state backend (and that appends do not need to read the entire
list state).

I ask because i tried setting a uid on my cogroup operator like below and
this results in a compilation error(no uid method available?)
 
firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")


Is the comment referring to having enough memory on the read side? If so,
isn't this true for any window process function?

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Comment in source code of CoGroupedStreams

Piotr Nowojski-4
Hi Sudharsan,

Sorry for maybe a bit late response, but as far as I can tell, this comment refers to this piece of code:

        public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)
                throws Exception {

            List<T1> oneValues = new ArrayList<>();
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val : values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }

from

org.apache.flink.streaming.api.datastream.CoGroupedStreams.CoGroupWindowFunction#apply

You are right, WindowOperator uses state backends to store the elements, but at the very least this function (there might be a reason why is it doing this eagerly) seems to be assembling CoGrouped elements into two distinct ArrayLists, before handing them over to the `CoGroupFunction`.

Best,
Piotrek

czw., 21 sty 2021 o 20:38 sudranga <[hidden email]> napisał(a):
Is this comment in the file
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
accurate?

" * <p>Note: Right now, the groups are being built in memory so you need to
ensure that they don't
 * get too big. Otherwise the JVM might crash."

Looking at the source code of CoGroupedStreams, i see that it simply does a
map, union and then the data is assigned to appropriate windows. I assumed
that the persistence of elements in the window itself is done using my
configured state backend (and that appends do not need to read the entire
list state).

I ask because i tried setting a uid on my cogroup operator like below and
this results in a compilation error(no uid method available?)

firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")


Is the comment referring to having enough memory on the read side? If so,
isn't this true for any window process function?

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/