streamin Table API - strange behavior

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

streamin Table API - strange behavior

Plamen Paskov

Hi,

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    @Override
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();
                    }
                });


        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
        result.print();

        env.execute();
    }

    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {
        }

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();
        }
    }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks

Reply | Threaded
Open this post in threaded view
|

Re: streamin Table API - strange behavior

Fabian Hueske-2
Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]>:

Hi,

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    @Override
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();
                    }
                });


        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
        result.print();

        env.execute();
    }

    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {
        }

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();
        }
    }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks


Reply | Threaded
Open this post in threaded view
|

Re: streamin Table API - strange behavior

Plamen Paskov

Hi Fabian,

Thank you for your response! I think it's not necessary to do that because i have a call to anyway:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "all the event-time window assigners have an EventTimeTrigger as default trigger.
This trigger simply fires once the watermark passes the end of a window." . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API?

On 14.12.2017 17:57, Fabian Hueske wrote:
Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]>:

Hi,

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    @Override
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();
                    }
                });


        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
        result.print();

        env.execute();
    }

    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {
        }

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();
        }
    }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks



Reply | Threaded
Open this post in threaded view
|

Re: streamin Table API - strange behavior

Fabian Hueske-2
Hi,

yes you are right. I forgot that the interval is set by default when enabling event time.

Also your comment about triggering the window is correct. Technically, you don't need a record that falls into the next window, but just a watermark that is past the window boundary.
In your case, watermarks only advance if the assigner sees more records and you'd need a record with a timestamp of at least 2017-12-14 13:10:15 (or 16), because the watermark assigner subtracts 10 seconds.
Given the current watermark assigner, there is no other way than sending more records to trigger a window computation. You can implement a custom assigner to also emit watermarks without data, but that would somewhat bind the event-time watermarks to the clock of the generating machine such that watermarks wouldn't be only data-driven.

Best, Fabian

2017-12-14 17:25 GMT+01:00 Plamen Paskov <[hidden email]>:

Hi Fabian,

Thank you for your response! I think it's not necessary to do that because i have a call to anyway:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

which do exactly what you say. It set the watermark interval to 200ms .
I think i found the problem and it is the default event-time trigger attached to the assigner?.
According to the docs here https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html : "all the event-time window assigners have an EventTimeTrigger as default trigger.
This trigger simply fires once the watermark passes the end of a window." . All i have to do in order to trigger the computation is to send an event which will fall in "next" window.
So the question now is how can i set trigger to fire in regular intervals (e.g. every 5 seconds) using table API?

On 14.12.2017 17:57, Fabian Hueske wrote:
Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]>:

Hi,

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    @Override
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();
                    }
                });


        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
        result.print();

        env.execute();
    }

    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {
        }

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();
        }
    }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks