Hi Simone,
if you are just executing DataStream pipelines locally in your IDE while
prototyping. You should be able to use `DataStream#print()` which just
prints to standard out [1] (It might be hidden between the log messages).
For debugging locally, you can also just set breakpoints in your
functions like in `StatefulSessionCalculator` and use the debugging mode
of your IDE.
Regards,
Timo
[1]
https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Fmaster%2Fflink-examples%2Fflink-examples-streaming%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fstreaming%2Fexamples%2Fwordcount%2FWordCount.java&data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=OLLSzUOK5GhE9ES5Y7XsOXS2KKhUdZfcOkuDFFDfML8%3D&reserved=0
On 24.11.20 11:09, Simone Cavallarin wrote:
> I tried to `DataStream#print()` but I don't quite understand how to
> implement it. Could you please give me an example? I'm using Intellij so
> what I would need is just to see the data on my screen.
>
> Thanks
>
> ------------------------------------------------------------------------
> *From:* David Anderson <
[hidden email]>
> *Sent:* 24 November 2020 10:01
> *To:* Pankaj Chand <
[hidden email]>
> *Cc:* Austin Cawley-Edwards <
[hidden email]>; Simone Cavallarin
> <
[hidden email]>;
[hidden email] <
[hidden email]>
> *Subject:* Re: Print on screen DataStream content
> When Flink is running on a cluster, `DataStream#print()` prints to files
> in the log directory.
>
> Regards,
> David
>
> On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <
[hidden email]
> <
[hidden email]>> wrote:
>
> Please correct me if I am wrong. `DataStream#print()` only prints to
> the screen when running from the IDE, but does not work (print to
> the screen) when running on a cluster (even a local cluster).
>
> Thanks,
>
> Pankaj
>
> On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards
> <
[hidden email] <
[hidden email]>> wrote:
>
> Hey Simone,
>
> I'd suggest trying out the `DataStream#print()` function to
> start, but there are a few other easy-to-integrate sinks for
> testing that you can check out in the docs here[1]
>
> Best,
> Austin
>
> [1]:
>
https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&reserved=0
> <
https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fdatastream_api.html%23data-sinks&data=04%7C01%7C%7Ce156804411e14d71ebec08d8906f357a%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637418154590426156%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=eJzrCz1ZOIY%2FHp3Vy7vnhOSukKO9%2F58lxHOSdJSj9bY%3D&reserved=0>
>
> On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin
> <
[hidden email] <
[hidden email]>> wrote:
>
> Hi All,
>
> On my code I have a DataStream that I would like to access.
> I need to understand what I'm getting for each
> transformation to check if the data that I'm working on make
> sense. How can I print into the console or get a file (csv,
> txt) for the variables: "stream", "enriched" and "result"?
>
> I have tried different way but no way to get the data.
>
> Thanks!
> /
> /
> /
> /
> / FlinkKafkaConsumer<Event> kafkaData =/
> / new FlinkKafkaConsumer("CorID_1", new
> EventDeserializationSchema(), p);/
> / WatermarkStrategy<Event> wmStrategy =/
> / WatermarkStrategy/
> / .<Event>forMonotonousTimestamps()/
> / .withIdleness(Duration.ofMinutes(1))/
> / .withTimestampAssigner((event,
> timestamp) -> {/
> / return event.get_Time();/
> / });/
> / DataStream<Event> *stream *= env.addSource(/
> /
> kafkaData.assignTimestampsAndWatermarks(wmStrategy));/
> /
> /
> / DataStream<Tuple2<Event, Long>> *enriched* = stream/
> / .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)/
> / .map(new StatefulSessionCalculator());/
> /
> /
> / WindowedStream<Tuple2<Event, Long>, String,
> TimeWindow> *result* = enriched/
> / .keyBy(new MyKeySelector())/
> /
> .window(EventTimeSessionWindows.withDynamicGap(new
> DynamicSessionWindows()));/
>