Re: streamin Table API - strange behavior

Posted by Fabian Hueske-2 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/streamin-Table-API-strange-behavior-tp17270p17272.html

Hi,

you are using a BoundedOutOfOrdernessTimestampExtractor to generate watermarks.
The BoundedOutOfOrdernessTimestampExtractor is a periodic watermark assigner and only generates watermarks if a watermark interval is configured.
Without watermarks, the query cannot "make progress" and only computes its result when the program is closed (sources emit a MAX_LONG watermark when being canceled).

Long story short: you need to configure the watermark interval: env.getConfig.setAutoWatermarkInterval(100L);

Best, Fabian

2017-12-14 16:30 GMT+01:00 Plamen Paskov <[hidden email]>:

Hi,

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    @Override
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();
                    }
                });


        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
        result.print();

        env.execute();
    }

    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {
        }

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();
        }
    }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks