streamin Table API - strange behavior

Posted by Plamen Paskov on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/streamin-Table-API-strange-behavior-tp17270.html

Hi,

I'm trying to run the following streaming program in my local flink 1.3.2 environment. The program compile and run without any errors but the print() call doesn't display anything. Once i stop the program i receive all aggregated data. Any ideas how to make it output regularly or when new data come/old data updated?

package flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.Slide;
import org.apache.flink.table.api.java.StreamTableEnvironment;

import java.sql.Timestamp;


public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.getTableEnvironment(env);


        SingleOutputStreamOperator<WC> input = env
                .socketTextStream("localhost", 9000, "\n")
                .map(new MapFunction<String, WC>() {
                    @Override
                    public WC map(String value) throws Exception {
                        String[] row = value.split(",");
                        Timestamp timestamp = Timestamp.valueOf(row[2]);
                        return new WC(row[0], Long.valueOf(row[1]), timestamp);
                    }
                })
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<WC>(Time.seconds(10)) {
                    @Override
                    public long extractTimestamp(WC element) {
                        return element.dt.getTime();
                    }
                });


        tEnv.registerDataStream("WordCount", input, "word, frequency, dt.rowtime");

        Table table = tEnv.scan("WordCount")
                .window(Slide.over("10.seconds").every("5.seconds").on("dt").as("w"))
                .groupBy("w, word")
                .select("word, frequency.sum as frequency, w.start as dt");

        DataStream<Tuple2<Boolean, WC>> result = tEnv.toRetractStream(table, WC.class);
        result.print();

        env.execute();
    }

    public static class WC {
        public String word;
        public long frequency;
        public Timestamp dt;

        public WC() {
        }

        public WC(String word, long frequency, Timestamp dt) {
            this.word = word;
            this.frequency = frequency;
            this.dt = dt;
        }

        @Override
        public String toString() {
            return "WC " + word + " " + frequency + " " + dt.getTime();
        }
    }
}


Sample input:

hello,1,2017-12-14 13:10:01
ciao,1,2017-12-14 13:10:02
hello,1,2017-12-14 13:10:03
hello,1,2017-12-14 13:10:04


Thanks