Hi,
I am trying to implement a flink job which takes the twitter as the source and collects tweets from a list of hashtags. The flink job basically aggregates the volume of tweets per hashtag in a given time frame. I have implemented them successfully, but then if there is no tweet across all the hashtags I need to send out a default value of 0 across all hashtags. Not sure how to implement this functionality. Code Snippet : env.addSource(source) .flatMap(new ExtractHashTagsSymbols(tickers)) .keyBy(0) .timeWindow(Time.seconds(Integer.parseInt(window_time))) .sum(1) .timeWindowAll(Time.seconds(Integer.parseInt(window_time))) .apply(new GetVolume(tickerVolumeMap)) .addSink(new SinkFunction<JSONObject>(){
public void invoke(JSONObject value) throws Exception { System.out.println("Twitter Volume:"+value.toString()); //JsonParser jsonParser = new JsonParser(); //JsonObject gsonObject = (JsonObject)jsonParser.parse(value.toString()); pushToSocket(value, socket_url); } }); The above code waits for window_time time frame and computes the tweet volume and sends out a json. Regards, Vijay Raajaa GS |
You mean you want to output some data when you know that you don’t have any counts for a given time window?
This is not (easily) possible in Flink right now because this would require an operation with parallelism one that determines that there is no data across all keys. Best, Aljoscha
|
Free forum by Nabble | Edit this page |