SingleOutputStreamOperator addsink Error

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

SingleOutputStreamOperator addsink Error

G.S.Vijay Raajaa
Hi,

I am trying to pass the SingleOutputStreamOperator to a custom sink. I am getting an error while implementing the same.

Code snippet:

SingleOutputStreamOperator<JSONObject> stream = env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(10))

            .sum(1)

            .timeWindowAll(Time.seconds(10))

            .apply(new GetVolume(tickerVolumeMap));

            stream.addSink(new SinkFunction<JsonObject>(){

    

    public void invoke(JsonObject value) throws Exception {

     pushToSocket(value, socket_url);

    }

    });


I am getting the following error: The method addSink(SinkFunction<JSONObject>) in the type DataStream<JSONObject> is not applicable for the arguments (new SinkFunction<JsonObject>(){})

Looking forward to your view.

Regards,

Vijay Raajaa GS 

Reply | Threaded
Open this post in threaded view
|

Re: SingleOutputStreamOperator addsink Error

Ted Yu
bq. new SinkFunction<JsonObject>(){

Note the case in JsonObject. It should be JSONObject

FYI

On Thu, Jun 8, 2017 at 1:27 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
Hi,

I am trying to pass the SingleOutputStreamOperator to a custom sink. I am getting an error while implementing the same.

Code snippet:

SingleOutputStreamOperator<JSONObject> stream = env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(10))

            .sum(1)

            .timeWindowAll(Time.seconds(10))

            .apply(new GetVolume(tickerVolumeMap));

            stream.addSink(new SinkFunction<JsonObject>(){

    

    public void invoke(JsonObject value) throws Exception {

     pushToSocket(value, socket_url);

    }

    });


I am getting the following error: The method addSink(SinkFunction<JSONObject>) in the type DataStream<JSONObject> is not applicable for the arguments (new SinkFunction<JsonObject>(){})

Looking forward to your view.

Regards,

Vijay Raajaa GS 


Reply | Threaded
Open this post in threaded view
|

Re: SingleOutputStreamOperator addsink Error

G.S.Vijay Raajaa
That's right. Jackson and Gson similar naming convention.

Thanks for the quick catch.

Regards,
Vijay Raajaa GS 

On Fri, Jun 9, 2017 at 1:59 AM, Ted Yu <[hidden email]> wrote:
bq. new SinkFunction<JsonObject>(){

Note the case in JsonObject. It should be JSONObject

FYI

On Thu, Jun 8, 2017 at 1:27 PM, G.S.Vijay Raajaa <[hidden email]> wrote:
Hi,

I am trying to pass the SingleOutputStreamOperator to a custom sink. I am getting an error while implementing the same.

Code snippet:

SingleOutputStreamOperator<JSONObject> stream = env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(10))

            .sum(1)

            .timeWindowAll(Time.seconds(10))

            .apply(new GetVolume(tickerVolumeMap));

            stream.addSink(new SinkFunction<JsonObject>(){

    

    public void invoke(JsonObject value) throws Exception {

     pushToSocket(value, socket_url);

    }

    });


I am getting the following error: The method addSink(SinkFunction<JSONObject>) in the type DataStream<JSONObject> is not applicable for the arguments (new SinkFunction<JsonObject>(){})

Looking forward to your view.

Regards,

Vijay Raajaa GS