Re: Print on screen DataStream content

Posted by SimAzz on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Print-on-screen-DataStream-content-tp39728p39737.html

I tried to `DataStream#print()` but I don't quite understand how to implement it. Could you please give me an example? I'm using Intellij so what I would need is just to see the data on my screen.

Thanks


From: David Anderson <[hidden email]>
Sent: 24 November 2020 10:01
To: Pankaj Chand <[hidden email]>
Cc: Austin Cawley-Edwards <[hidden email]>; Simone Cavallarin <[hidden email]>; [hidden email] <[hidden email]>
Subject: Re: Print on screen DataStream content
 
When Flink is running on a cluster, `DataStream#print()` prints to files in the log directory.

Regards,
David

On Tue, Nov 24, 2020 at 6:03 AM Pankaj Chand <[hidden email]> wrote:
Please correct me if I am wrong. `DataStream#print()` only prints to the screen when running from the IDE, but does not work (print to the screen) when running on a cluster (even a local cluster).

Thanks,

Pankaj

On Mon, Nov 23, 2020 at 5:31 PM Austin Cawley-Edwards <[hidden email]> wrote:
Hey Simone,

I'd suggest trying out the `DataStream#print()` function to start, but there are a few other easy-to-integrate sinks for testing that you can check out in the docs here[1]

Best,
Austin


On Mon, Nov 23, 2020 at 3:56 PM Simone Cavallarin <[hidden email]> wrote:
Hi All,

On my code I have a DataStream that I would like to access. I need to understand what I'm getting for each transformation to check if the data that I'm working on make sense. How can I print into the console or get a file (csv, txt) for the variables: "stream", "enriched" and "result"?

I have tried different way but no way to get the data.

Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> {
                            return event.get_Time();
                        });
        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));

        DataStream<Tuple2<Event, Long>> enriched = stream
                .keyBy((Event KafkaMSG) -> KafkaMSG.CorrID)
                .map(new StatefulSessionCalculator());

        WindowedStream<Tuple2<Event, Long>, String, TimeWindow> result = enriched
                .keyBy(new MyKeySelector())
                .window(EventTimeSessionWindows.withDynamicGap(new DynamicSessionWindows()));