Default value - Time window expires with no data from source

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Default value - Time window expires with no data from source

G.S.Vijay Raajaa
Hi,

I am trying to implement a flink job which takes the twitter as the source and collects tweets from a list of hashtags. The flink job basically aggregates the volume of tweets per hashtag in a given time frame. I have implemented them successfully, but then if there is no tweet across all the hashtags I need to send out a default value of 0 across all hashtags. Not sure how to implement this functionality.

Code Snippet :

env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(Integer.parseInt(window_time)))

            .sum(1)

            .timeWindowAll(Time.seconds(Integer.parseInt(window_time)))

            .apply(new GetVolume(tickerVolumeMap))

.addSink(new SinkFunction<JSONObject>(){

    

    public void invoke(JSONObject value) throws Exception {

    System.out.println("Twitter Volume:"+value.toString());

    //JsonParser jsonParser = new JsonParser();

        //JsonObject gsonObject = (JsonObject)jsonParser.parse(value.toString());

    pushToSocket(value, socket_url);

    }

    });


The above code waits for window_time time frame and computes the tweet volume and sends out a json. 

Regards,

Vijay Raajaa GS 

Reply | Threaded
Open this post in threaded view
|

Re: Default value - Time window expires with no data from source

Aljoscha Krettek
You mean you want to output some data when you know that you don’t have any counts for a given time window?

This is not (easily) possible in Flink right now because this would require an operation with parallelism one that determines that there is no data across all keys.

Best,
Aljoscha

On 24. Jun 2017, at 18:22, G.S.Vijay Raajaa <[hidden email]> wrote:

Hi,

I am trying to implement a flink job which takes the twitter as the source and collects tweets from a list of hashtags. The flink job basically aggregates the volume of tweets per hashtag in a given time frame. I have implemented them successfully, but then if there is no tweet across all the hashtags I need to send out a default value of 0 across all hashtags. Not sure how to implement this functionality.

Code Snippet :

env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(Integer.parseInt(window_time)))

            .sum(1)

            .timeWindowAll(Time.seconds(Integer.parseInt(window_time)))

            .apply(new GetVolume(tickerVolumeMap))

.addSink(new SinkFunction<JSONObject>(){

    

    public void invoke(JSONObject value) throws Exception {

    System.out.println("Twitter Volume:"+value.toString());

    //JsonParser jsonParser = new JsonParser();

        //JsonObject gsonObject = (JsonObject)jsonParser.parse(value.toString());

    pushToSocket(value, socket_url);

    }

    });


The above code waits for window_time time frame and computes the tweet volume and sends out a json. 

Regards,

Vijay Raajaa GS