Comment in source code of CoGroupedStreams

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Comment in source code of CoGroupedStreams

sudranga
Is this comment in the file flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
accurate?

" * <p>Note: Right now, the groups are being built in memory so you need to ensure that they don't
 * get too big. Otherwise the JVM might crash."

Looking at the source code of CoGroupedStreams, i see that it simply does a map, union and then the data is assigned to appropriate windows. I assumed that the persistence of elements in the window itself is done using my configured state backend (and that appends do not need to read the entire list state).

I ask because i tried setting a uid on my cogroup operator like below and this results in a compilation error(no uid method available?)
 
firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")


Is the comment referring to having enough memory on the read side? If so, isn't this true for any window process function?

Thanks
Sudharsan
Reply | Threaded
Open this post in threaded view
|

Re: Comment in source code of CoGroupedStreams

Guowei Ma
Hi,
I think you could try something like this
firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .with(new MyCogroupFunction())
      .uid("myCoGroup")
Best,
Guowei


On Fri, Jan 22, 2021 at 4:33 AM Sudharsan R <[hidden email]> wrote:
Is this comment in the file flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
accurate?

" * <p>Note: Right now, the groups are being built in memory so you need to ensure that they don't
 * get too big. Otherwise the JVM might crash."

Looking at the source code of CoGroupedStreams, i see that it simply does a map, union and then the data is assigned to appropriate windows. I assumed that the persistence of elements in the window itself is done using my configured state backend (and that appends do not need to read the entire list state).

I ask because i tried setting a uid on my cogroup operator like below and this results in a compilation error(no uid method available?)
 
firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")


Is the comment referring to having enough memory on the read side? If so, isn't this true for any window process function?

Thanks
Sudharsan
Reply | Threaded
Open this post in threaded view
|

Re: Comment in source code of CoGroupedStreams

Dawid Wysakowicz-2
In reply to this post by sudranga

For the problem of the uid you can follow Guowei's advice.

As for the comment, I think it means that all elements of a single key must fit into the memory when they're passed as iterators to the CoGroupFunction.

Best,

Dawid

On 21/01/2021 21:32, Sudharsan R wrote:
Is this comment in the file flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
accurate?

" * <p>Note: Right now, the groups are being built in memory so you need to ensure that they don't
 * get too big. Otherwise the JVM might crash."

Looking at the source code of CoGroupedStreams, i see that it simply does a map, union and then the data is assigned to appropriate windows. I assumed that the persistence of elements in the window itself is done using my configured state backend (and that appends do not need to read the entire list state).

I ask because i tried setting a uid on my cogroup operator like below and this results in a compilation error(no uid method available?)
 
firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")


Is the comment referring to having enough memory on the read side? If so, isn't this true for any window process function?

Thanks
Sudharsan

signature.asc (849 bytes) Download Attachment