Hi,
I am trying to pass the SingleOutputStreamOperator to a custom sink. I am getting an error while implementing the same. Code snippet: SingleOutputStreamOperator<JSONObject> stream = env.addSource(source) .flatMap(new ExtractHashTagsSymbols(tickers)) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(1) .timeWindowAll(Time.seconds(10)) .apply(new GetVolume(tickerVolumeMap)); stream.addSink(new SinkFunction<JsonObject>(){
public void invoke(JsonObject value) throws Exception { pushToSocket(value, socket_url); } }); I am getting the following error: The method addSink(SinkFunction<JSONObject>) in the type DataStream<JSONObject> is not applicable for the arguments (new SinkFunction<JsonObject>(){}) Looking forward to your view. Regards, Vijay Raajaa GS |
bq. new SinkFunction<JsonObject>(){ Note the case in JsonObject. It should be JSONObject FYI On Thu, Jun 8, 2017 at 1:27 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
|
That's right. Jackson and Gson similar naming convention. Thanks for the quick catch. Regards, Vijay Raajaa GS On Fri, Jun 9, 2017 at 1:59 AM, Ted Yu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |