Print on screen DataStream content

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Print on screen DataStream content

SimAzz
Hi All,

On my code I have a DataStream that I would like to access. I need to understand what I'm getting for each transformation to check if the data that I'm working on make sense. How can I print into the console or get a file (csv, txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

austin.ce
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but there are a few other easy-to-integrate sinks for testing that you can check out in the docs here[1]

Best,
Austin


On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <[hidden email]> wrote:
Hi All,

On my code I have a DataStream that I would like to access. I need to understand what I'm getting for each transformation to check if the data that I'm working on make sense. How can I print into the console or get a file (csv, txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

Pankaj Chand
Please correct me if I am wrong. `DataStream#print()` only prints to the screen when running from the IDE, but does not work (print to the screen) when running on a cluster (even a local cluster).

Thanks,

Pankaj

On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but there are a few other easy-to-integrate sinks for testing that you can check out in the docs here[1]

Best,
Austin


On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <[hidden email]> wrote:
Hi All,

On my code I have a DataStream that I would like to access. I need to understand what I'm getting for each transformation to check if the data that I'm working on make sense. How can I print into the console or get a file (csv, txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

David Anderson-3
When Flink is running on a cluster, `DataStream#print()` prints to files in the log directory.

Regards,
David

On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]> wrote:
Please correct me if I am wrong. `DataStream#print()` only prints to the screen when running from the IDE, but does not work (print to the screen) when running on a cluster (even a local cluster).

Thanks,

Pankaj

On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but there are a few other easy-to-integrate sinks for testing that you can check out in the docs here[1]

Best,
Austin


On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <[hidden email]> wrote:
Hi All,

On my code I have a DataStream that I would like to access. I need to understand what I'm getting for each transformation to check if the data that I'm working on make sense. How can I print into the console or get a file (csv, txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

SimAzz
I tried to `DataStream#print()` but I don't quite understand how to implement it. Could you please give me an example? I'm using Intellij so what I would need is just to see the data on my screen.

Thanks


From: David Anderson <[hidden email]>
Sent: 24 November 2020 10:01
To: Pankaj Chand <[hidden email]>
Cc: Austin Cawley-Edwards <[hidden email]>; Simone Cavallarin <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Print on screen DataStream content
 
When Flink is running on a cluster, `DataStream#print()` prints to files in the log directory.

Regards,
David

On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]> wrote:
Please correct me if I am wrong. `DataStream#print()` only prints to the screen when running from the IDE, but does not work (print to the screen) when running on a cluster (even a local cluster).

Thanks,

Pankaj

On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but there are a few other easy-to-integrate sinks for testing that you can check out in the docs here[1]

Best,
Austin


On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <[hidden email]> wrote:
Hi All,

On my code I have a DataStream that I would like to access. I need to understand what I'm getting for each transformation to check if the data that I'm working on make sense. How can I print into the console or get a file (csv, txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));
Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

Timo Walther
Hi Simone,

if you are just executing DataStream pipelines locally in your IDE while
prototyping. You should be able to use `DataStream#print()` which just
prints to standard out [1] (It might be hidden between the log messages).

For debugging locally, you can also just set breakpoints in your
functions like in `StatefulSessionCalculator` and use the debugging mode
of your IDE.

Regards,
Timo

[1]
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java


On 24.11.20 11:09, Simone Cavallarin wrote:

> I tried to `DataStream#print()` but I don't quite understand how to
> implement it. Could you please give me an example? I'm using Intellij so
> what I would need is just to see the data on my screen.
>
> Thanks
>
> ------------------------------------------------------------------------
> *From:* David Anderson <[hidden email]>
> *Sent:* 24 November 2020 10:01
> *To:* Pankaj Chand <[hidden email]>
> *Cc:* Austin Cawley-Edwards <[hidden email]>; Simone Cavallarin
> <[hidden email]>; [hidden email] <[hidden email]>
> *Subject:* Re: Print on screen DataStream content
> When Flink is running on a cluster, `DataStream#print()` prints to files
> in the log directory.
>
> Regards,
> David
>
> On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Please correct me if I am wrong. `DataStream#print()` only prints to
>     the screen when running from the IDE, but does not work (print to
>     the screen) when running on a cluster (even a local cluster).
>
>     Thanks,
>
>     Pankaj
>
>     On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards
>     <[hidden email] <mailto:[hidden email]>> wrote:
>
>         Hey Simone,
>
>         I'd suggest trying out the `DataStream#print()` function to
>         start, but there are a few other easy-to-integrate sinks for
>         testing that you can check out in the docs here[1]
>
>         Best,
>         Austin
>
>         [1]:
>         https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sinks
>         <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&data=04%7C01%7C%7C27267479d99245bad55408d8905fea55%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418088905500774%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=9YFLY5slrok3wXJ5n1F3s0BNYuzNs%2F70RLWLsWJCkzE%3D&reserved=0>
>
>         On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin
>         <[hidden email] <mailto:[hidden email]>> wrote:
>
>             Hi All,
>
>             On my code I have a DataStream that I would like to access.
>             I need to understand what I'm getting for each
>             transformation to check if the data that I'm working on make
>             sense. How can I print into the console or get a file (csv,
>             txt) for the variables: "stream", "enriched" and "result"?
>
>             I have tried different way but no way to get the data.
>
>             Thanks!
>             /
>             /
>             /
>             /
>             /        FlinkKafkaConsumer<Event> kafkaData =/
>             /                new FlinkKafkaConsumer("CorID_1", new
>             EventDeserializationSchema(), p);/
>             /        WatermarkStrategy<Event> wmStrategy =/
>             /                WatermarkStrategy/
>             /                        .<Event>forMonotonousTimestamps()/
>             /                        .withIdleness(Duration.ofMinutes(1))/
>             /                        .withTimestampAssigner((event,
>             timestamp) -> {/
>             /                            return event.get_Time();/
>             /                        });/
>             /        DataStream<Event> *stream *= env.addSource(/
>             /              
>             kafkaData.assignTimestampsAndWatermarks(wmStrategy));/
>             /
>             /
>             /        DataStream<Tuple2<Event, Long>> *enriched* = stream/
>             /                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)/
>             /                .map(new StatefulSessionCalculator());/
>             /
>             /
>             /        WindowedStream<Tuple2<Event, Long>, String,
>             TimeWindow> *result* = enriched/
>             /                .keyBy(new MyKeySelector())/
>             /              
>             .window(EventTimeSessionWindows.withDynamicGap(new
>             DynamicSessionWindows()));/
>

Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

SimAzz
Hi,

yes, I would like to debug locally on my IDE.

This is what I tried so far, but no luck.

a)    String ff = result.toString();
        System.out.print(ff);
       
 b)   printOrTest(stream);
       
 c)    stream.print();
       
  d)   System.out.println(stream.print());


This is the output and to me it looks like an "IDcode" that indicate each Event on my application but i don't know how to access the data inside. My Event is wrapped in a POJO. So each Event is something like this ['tom', 1.70, 55, 1929219939, M....]

p1.Event@1ca90014
p1.Event@5e550c0c
p1.Event@402baa28
p1.Event@5cf8ae00
p1.Event@805bc2b
p1.Event@7e46f44b
p1.Event@63a17b41

Apologies, but I really don't understand if I write `DataStream#print()` I just receive an error... i'm missing something here..
On the example there are System.out.println().

Thanks

From: Timo Walther <[hidden email]>
Sent: 24 November 2020 11:50
To: [hidden email] <[hidden email]>
Subject: Re: Print on screen DataStream content
 
Hi Simone,

if you are just executing DataStream pipelines locally in your IDE while
prototyping. You should be able to use `DataStream#print()` which just
prints to standard out [1] (It might be hidden between the log messages).

For debugging locally, you can also just set breakpoints in your
functions like in `StatefulSessionCalculator` and use the debugging mode
of your IDE.

Regards,
Timo

[1]
https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-examples%2Fflink-examples-streaming%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fexamples%2Fwordcount%2FWordCount.java&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OLLSzUOK5GhE9ES5Y7XsOXS2KKhUdZfcOkuDFFDfML8%3D&amp;reserved=0


On 24.11.20 11:09, Simone Cavallarin wrote:
> I tried to `DataStream#print()` but I don't quite understand how to
> implement it. Could you please give me an example? I'm using Intellij so
> what I would need is just to see the data on my screen.
>
> Thanks
>
> ------------------------------------------------------------------------
> *From:* David Anderson <[hidden email]>
> *Sent:* 24 November 2020 10:01
> *To:* Pankaj Chand <[hidden email]>
> *Cc:* Austin Cawley-Edwards <[hidden email]>; Simone Cavallarin
> <[hidden email]>; [hidden email] <[hidden email]>
> *Subject:* Re: Print on screen DataStream content
> When Flink is running on a cluster, `DataStream#print()` prints to files
> in the log directory.
>
> Regards,
> David
>
> On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]
> <[hidden email]>> wrote:
>
>     Please correct me if I am wrong. `DataStream#print()` only prints to
>     the screen when running from the IDE, but does not work (print to
>     the screen) when running on a cluster (even a local cluster).
>
>     Thanks,
>
>     Pankaj
>
>     On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards
>     <[hidden email] <[hidden email]>> wrote:
>
>         Hey Simone,
>
>         I'd suggest trying out the `DataStream#print()` function to
>         start, but there are a few other easy-to-integrate sinks for
>         testing that you can check out in the docs here[1]
>
>         Best,
>         Austin
>
>         [1]:
>         https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&amp;reserved=0
>         <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&amp;reserved=0>
>
>         On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin
>         <[hidden email] <[hidden email]>> wrote:
>
>             Hi All,
>
>             On my code I have a DataStream that I would like to access.
>             I need to understand what I'm getting for each
>             transformation to check if the data that I'm working on make
>             sense. How can I print into the console or get a file (csv,
>             txt) for the variables: "stream", "enriched" and "result"?
>
>             I have tried different way but no way to get the data.
>
>             Thanks!
>             /
>             /
>             /
>             /
>             /        FlinkKafkaConsumer<Event> kafkaData =/
>             /                new FlinkKafkaConsumer("CorID_1", new
>             EventDeserializationSchema(), p);/
>             /        WatermarkStrategy<Event> wmStrategy =/
>             /                WatermarkStrategy/
>             /                        .<Event>forMonotonousTimestamps()/
>             /                        .withIdleness(Duration.ofMinutes(1))/
>             /                        .withTimestampAssigner((event,
>             timestamp) -> {/
>             /                            return event.get_Time();/
>             /                        });/
>             /        DataStream<Event> *stream *= env.addSource(/
>             /              
>             kafkaData.assignTimestampsAndWatermarks(wmStrategy));/
>             /
>             /
>             /        DataStream<Tuple2<Event, Long>> *enriched* = stream/
>             /                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)/
>             /                .map(new StatefulSessionCalculator());/
>             /
>             /
>             /        WindowedStream<Tuple2<Event, Long>, String,
>             TimeWindow> *result* = enriched/
>             /                .keyBy(new MyKeySelector())/
>             /              
>             .window(EventTimeSessionWindows.withDynamicGap(new
>             DynamicSessionWindows()));/
>

Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

David Anderson-3
Simone,

What you want to do is to override the toString() method on Event so that it produces a more helpful String as its result, and then use

stream.print()

in your IDE (where stream is a DataStream<Event>).

By the way, printOrTest(stream) isn't part of Flink -- that's just something used by the training exercises.

Best,
David

On Tue, Nov 24, 2020 at 4:08 PM Simone Cavallarin <[hidden email]> wrote:
Hi,

yes, I would like to debug locally on my IDE.

This is what I tried so far, but no luck.

a)    String ff = result.toString();
        System.out.print(ff);
       
 b)   printOrTest(stream);
       
 c)    stream.print();
       
  d)   System.out.println(stream.print());


This is the output and to me it looks like an "IDcode" that indicate each Event on my application but i don't know how to access the data inside. My Event is wrapped in a POJO. So each Event is something like this ['tom', 1.70, 55, 1929219939, M....]

p1.Event@1ca90014
p1.Event@5e550c0c
p1.Event@402baa28
p1.Event@5cf8ae00
p1.Event@805bc2b
p1.Event@7e46f44b
p1.Event@63a17b41

Apologies, but I really don't understand if I write `DataStream#print()` I just receive an error... i'm missing something here..
On the example there are System.out.println().

Thanks

From: Timo Walther <[hidden email]>
Sent: 24 November 2020 11:50
To: [hidden email] <[hidden email]>
Subject: Re: Print on screen DataStream content
 
Hi Simone,

if you are just executing DataStream pipelines locally in your IDE while
prototyping. You should be able to use `DataStream#print()` which just
prints to standard out [1] (It might be hidden between the log messages).

For debugging locally, you can also just set breakpoints in your
functions like in `StatefulSessionCalculator` and use the debugging mode
of your IDE.

Regards,
Timo

[1]
https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-examples%2Fflink-examples-streaming%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fexamples%2Fwordcount%2FWordCount.java&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OLLSzUOK5GhE9ES5Y7XsOXS2KKhUdZfcOkuDFFDfML8%3D&amp;reserved=0


On 24.11.20 11:09, Simone Cavallarin wrote:
> I tried to `DataStream#print()` but I don't quite understand how to
> implement it. Could you please give me an example? I'm using Intellij so
> what I would need is just to see the data on my screen.
>
> Thanks
>
> ------------------------------------------------------------------------
> *From:* David Anderson <[hidden email]>
> *Sent:* 24 November 2020 10:01
> *To:* Pankaj Chand <[hidden email]>
> *Cc:* Austin Cawley-Edwards <[hidden email]>; Simone Cavallarin
> <[hidden email]>; [hidden email] <[hidden email]>
> *Subject:* Re: Print on screen DataStream content
> When Flink is running on a cluster, `DataStream#print()` prints to files
> in the log directory.
>
> Regards,
> David
>
> On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]
> <[hidden email]>> wrote:
>
>     Please correct me if I am wrong. `DataStream#print()` only prints to
>     the screen when running from the IDE, but does not work (print to
>     the screen) when running on a cluster (even a local cluster).
>
>     Thanks,
>
>     Pankaj
>
>     On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards
>     <[hidden email] <[hidden email]>> wrote:
>
>         Hey Simone,
>
>         I'd suggest trying out the `DataStream#print()` function to
>         start, but there are a few other easy-to-integrate sinks for
>         testing that you can check out in the docs here[1]
>
>         Best,
>         Austin
>
>         [1]:
>         https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&amp;reserved=0
>         <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&amp;reserved=0>
>
>         On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin
>         <[hidden email] <[hidden email]>> wrote:
>
>             Hi All,
>
>             On my code I have a DataStream that I would like to access.
>             I need to understand what I'm getting for each
>             transformation to check if the data that I'm working on make
>             sense. How can I print into the console or get a file (csv,
>             txt) for the variables: "stream", "enriched" and "result"?
>
>             I have tried different way but no way to get the data.
>
>             Thanks!
>             /
>             /
>             /
>             /
>             /        FlinkKafkaConsumer<Event> kafkaData =/
>             /                new FlinkKafkaConsumer("CorID_1", new
>             EventDeserializationSchema(), p);/
>             /        WatermarkStrategy<Event> wmStrategy =/
>             /                WatermarkStrategy/
>             /                        .<Event>forMonotonousTimestamps()/
>             /                        .withIdleness(Duration.ofMinutes(1))/
>             /                        .withTimestampAssigner((event,
>             timestamp) -> {/
>             /                            return event.get_Time();/
>             /                        });/
>             /        DataStream<Event> *stream *= env.addSource(/
>             /              
>             kafkaData.assignTimestampsAndWatermarks(wmStrategy));/
>             /
>             /
>             /        DataStream<Tuple2<Event, Long>> *enriched* = stream/
>             /                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)/
>             /                .map(new StatefulSessionCalculator());/
>             /
>             /
>             /        WindowedStream<Tuple2<Event, Long>, String,
>             TimeWindow> *result* = enriched/
>             /                .keyBy(new MyKeySelector())/
>             /              
>             .window(EventTimeSessionWindows.withDynamicGap(new
>             DynamicSessionWindows()));/
>

Reply | Threaded
Open this post in threaded view
|

Re: Print on screen DataStream content

SimAzz
ok, thanks you all for the help! s


From: David Anderson <[hidden email]>
Sent: 24 November 2020 15:16
To: Simone Cavallarin <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Print on screen DataStream content
 
Simone,

What you want to do is to override the toString() method on Event so that it produces a more helpful String as its result, and then use

stream.print()

in your IDE (where stream is a DataStream<Event>).

By the way, printOrTest(stream) isn't part of Flink -- that's just something used by the training exercises.

Best,
David

On Tue, Nov 24, 2020 at 4:08 PM Simone Cavallarin <[hidden email]> wrote:
Hi,

yes, I would like to debug locally on my IDE.

This is what I tried so far, but no luck.

a)    String ff = result.toString();
        System.out.print(ff);
       
 b)   printOrTest(stream);
       
 c)    stream.print();
       
  d)   System.out.println(stream.print());


This is the output and to me it looks like an "IDcode" that indicate each Event on my application but i don't know how to access the data inside. My Event is wrapped in a POJO. So each Event is something like this ['tom', 1.70, 55, 1929219939, M....]

p1.Event@1ca90014
p1.Event@5e550c0c
p1.Event@402baa28
p1.Event@5cf8ae00
p1.Event@805bc2b
p1.Event@7e46f44b
p1.Event@63a17b41

Apologies, but I really don't understand if I write `DataStream#print()` I just receive an error... i'm missing something here..
On the example there are System.out.println().

Thanks

From: Timo Walther <[hidden email]>
Sent: 24 November 2020 11:50
To: [hidden email] <[hidden email]>
Subject: Re: Print on screen DataStream content
 
Hi Simone,

if you are just executing DataStream pipelines locally in your IDE while
prototyping. You should be able to use `DataStream#print()` which just
prints to standard out [1] (It might be hidden between the log messages).

For debugging locally, you can also just set breakpoints in your
functions like in `StatefulSessionCalculator` and use the debugging mode
of your IDE.

Regards,
Timo

[1]
https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-examples%2Fflink-examples-streaming%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fexamples%2Fwordcount%2FWordCount.java&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=OLLSzUOK5GhE9ES5Y7XsOXS2KKhUdZfcOkuDFFDfML8%3D&amp;reserved=0


On 24.11.20 11:09, Simone Cavallarin wrote:
> I tried to `DataStream#print()` but I don't quite understand how to
> implement it. Could you please give me an example? I'm using Intellij so
> what I would need is just to see the data on my screen.
>
> Thanks
>
> ------------------------------------------------------------------------
> *From:* David Anderson <[hidden email]>
> *Sent:* 24 November 2020 10:01
> *To:* Pankaj Chand <[hidden email]>
> *Cc:* Austin Cawley-Edwards <[hidden email]>; Simone Cavallarin
> <[hidden email]>; [hidden email] <[hidden email]>
> *Subject:* Re: Print on screen DataStream content
> When Flink is running on a cluster, `DataStream#print()` prints to files
> in the log directory.
>
> Regards,
> David
>
> On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]
> <[hidden email]>> wrote:
>
>     Please correct me if I am wrong. `DataStream#print()` only prints to
>     the screen when running from the IDE, but does not work (print to
>     the screen) when running on a cluster (even a local cluster).
>
>     Thanks,
>
>     Pankaj
>
>     On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards
>     <[hidden email] <[hidden email]>> wrote:
>
>         Hey Simone,
>
>         I'd suggest trying out the `DataStream#print()` function to
>         start, but there are a few other easy-to-integrate sinks for
>         testing that you can check out in the docs here[1]
>
>         Best,
>         Austin
>
>         [1]:
>         https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&amp;reserved=0
>         <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&amp;data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&amp;reserved=0>
>
>         On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin
>         <[hidden email] <[hidden email]>> wrote:
>
>             Hi All,
>
>             On my code I have a DataStream that I would like to access.
>             I need to understand what I'm getting for each
>             transformation to check if the data that I'm working on make
>             sense. How can I print into the console or get a file (csv,
>             txt) for the variables: "stream", "enriched" and "result"?
>
>             I have tried different way but no way to get the data.
>
>             Thanks!
>             /
>             /
>             /
>             /
>             /        FlinkKafkaConsumer<Event> kafkaData =/
>             /                new FlinkKafkaConsumer("CorID_1", new
>             EventDeserializationSchema(), p);/
>             /        WatermarkStrategy<Event> wmStrategy =/
>             /                WatermarkStrategy/
>             /                        .<Event>forMonotonousTimestamps()/
>             /                        .withIdleness(Duration.ofMinutes(1))/
>             /                        .withTimestampAssigner((event,
>             timestamp) -> {/
>             /                            return event.get_Time();/
>             /                        });/
>             /        DataStream<Event> *stream *= env.addSource(/
>             /              
>             kafkaData.assignTimestampsAndWatermarks(wmStrategy));/
>             /
>             /
>             /        DataStream<Tuple2<Event, Long>> *enriched* = stream/
>             /                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)/
>             /                .map(new StatefulSessionCalculator());/
>             /
>             /
>             /        WindowedStream<Tuple2<Event, Long>, String,
>             TimeWindow> *result* = enriched/
>             /                .keyBy(new MyKeySelector())/
>             /              
>             .window(EventTimeSessionWindows.withDynamicGap(new
>             DynamicSessionWindows()));/
>