Can not get OutPutTag datastream from Windowing function

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Can not get OutPutTag datastream from Windowing function

Soheil Pourbafrani
Hi, according to the documents I tried to get late data using side output.

final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process);

When trying to store late data in a Datastream (As shown in document):
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.
there is no predefined getSideOutput method on DataStream res!
But if I call getSideOutput just after reduce function, it is known! But I don't want to save late data on res variable and I want to save them on another variable!
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process)
                 .getSideoutput(lateOutputTag);
What is the problem here?


Reply | Threaded
Open this post in threaded view
|

Re: Can not get OutPutTag datastream from Windowing function

Dawid Wysakowicz-2

Hi Soheil,

The getSideOutput method is part of SingleOutputStreamOperator which extends DataStream. Try using SingleOutputStreamOperator as the type for your res variable.

Best,

Dawid


On 17/07/18 09:36, Soheil Pourbafrani wrote:
Hi, according to the documents I tried to get late data using side output.

final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
                .assignTimestampsAndWatermarks(new Bound())
        }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
                .allowedLateness(Time.milliseconds(2))
                .sideOutputLateData(lateOutputTag)
                .reduce(Do some process);

When trying to store late data in a Datastream (As shown in document):
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.
there is no predefined getSideOutput method on DataStream res!
But if I call getSideOutput just after reduce function, it is known! But I don't want to save late data on res variable and I want to save them on another variable!
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
                .assignTimestampsAndWatermarks(new Bound())
        }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
                .allowedLateness(Time.milliseconds(2))
                .sideOutputLateData(lateOutputTag)
                .reduce(Do some process)
                 .getSideoutput(lateOutputTag);
What is the problem here?




signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can not get OutPutTag datastream from Windowing function

Xingcan Cui
In reply to this post by Soheil Pourbafrani
Hi Soheil,

The `getSideOutput()` method is defined on the operator instead of the datastream.
You can invoke it after any action (e.g., map, window) performed on a datastream.

Best,
Xingcan

On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani <[hidden email]> wrote:

Hi, according to the documents I tried to get late data using side output.

final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process);

When trying to store late data in a Datastream (As shown in document):
DataStream<Tuple3<String, Long, JSONObject>> lateData = res.
there is no predefined getSideOutput method on DataStream res!
But if I call getSideOutput just after reduce function, it is known! But I don't want to save late data on res variable and I want to save them on another variable!
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
.assignTimestampsAndWatermarks(new Bound())
}).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
.allowedLateness(Time.milliseconds(2))
.sideOutputLateData(lateOutputTag)
.reduce(Do some process)
                 .getSideoutput(lateOutputTag);
What is the problem here?