Getting splitStreams to work with DataStream<List<Events>>

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting splitStreams to work with DataStream<List<Events>>

smandrell
This post was updated on .
Basically, we are not splitting the streams correctly because when we try to select the stream we want from our splitStream (using the select() operation), it never returns a DataStream with just ERROR_EVENT's or a DataStream with just SUCCESS_EVENT's. Instead it returns a DataStream with both ERROR_EVENT's and SUCCESS_EVENT's.


I am receiving data by doing the following:

return env.fromElements(SUCCESS_EVENT_JSON, SUCCESS_AND_ERROR_EVENT_JSON);

SUCCESS_EVENT_JSON will generate one success event once it is sent through our parser. This is not the concern.

The concern is the SUCCESS_AND_ERROR_EVENT_JSON. SUCCESS_AND_ERROR_EVENT will generate 3 events once it is sent through our parser: 1 success event and 2 error events.


To discern between success events and error events in a given stream, we use the following splitting logic:



This splitting logic works fine when dealing with the stream generated from our parser on the SUCCESS_EVENT_JSON because there is only one event at play here: the success event.

However, the splitting logic does not correctly split the stream generated from sending SUCCESS_AND_ERROR_EVENT_JSON through our parser.

For some context: when sending SUCCESS_AND_ERROR_EVENT_JSON through our parser, the parser returns a DataStream<List<SuperClassEvent>> in the following form [ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT].

As you can see from the above code, we try to separate the ERROR_EVENT's from the SUCCESS_EVENT by doing output.add("success") or output.add("error") but when when we attempt to select the events in our SplitStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT] with splitStream.select("success") and splitStream.select("error"), the different events are not separated and both select() operations (splitStream.select("success") & splitStream.select("error")) return two DataStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT]'s and not one DataStream[ERROR_EVENT, ERROR_EVENT] and one DataStream[SUCCESS_EVENT].

WHAT WE THINK IS WRONG:
My suspicion for this issue is that we are attempting to split a DataStream<List<Events>> instead of a DataStream<TimeseriesEvt>, but I cannot find a workaround for DataStream<List<Events>>. Is there anyway to split a DataStream<List<Events>>?

Thanks!!


Reply | Threaded
Open this post in threaded view
|

Re: Split Streams not working

Kien Truong
Hi,

I think you're hitting this bug

https://issues.apache.org/jira/browse/FLINK-5031

Try the workaround mentioned in a bug: add a map function between map
and select

Regards,
Kien

On 7/25/2017 3:14 AM, smandrell wrote:

> Basically, we are not splitting the streams correctly because when we try to
> select the stream we want from our splitStream (using the select()
> operation), it never returns a DataStream with just ERROR_EVENT's or a
> DataStream with just SUCCESS_EVENT's. Instead it returns a DataStream with
> both ERROR_EVENT's and SUCCESS_EVENT's.
>
>
>
> I am receiving data by doing the following:
>
> return env.fromElements(SUCCESS_EVENT_JSON, SUCCESS_AND_ERROR_EVENT_JSON);
>
> SUCCESS_EVENT_JSON will generate one success event once it is sent through
> our parser. This is not the concern.
>
> The concern is the SUCCESS_AND_ERROR_EVENT_JSON. SUCCESS_AND_ERROR_EVENT
> will generate 3 events once it is sent through our parser: 1 success event
> and 2 error events.
>
>
> To discern between success events and error events in a given stream, we use
> the following splitting logic:
>
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14418/parser.png>
>
> This splitting logic works fine when dealing with the stream generated from
> our parser on the SUCCESS_EVENT_JSON because there is only one event at play
> here: the success event.
>
> However, the splitting logic does not correctly split the stream generated
> from sending SUCCESS_AND_ERROR_EVENT_JSON through our parser.
>
> For some context: when sending SUCCESS_AND_ERROR_EVENT_JSON through our
> parser, the parser returns a DataStream<List&lt;SuperClassEvent>> in the
> following form [ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT].
>
> As you can see from the above code, we try to separate the ERROR_EVENT's
> from the SUCCESS_EVENT by doing output.add("success") or output.add("error")
> but when when we attempt to select the events in our
> SplitStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT] with
> splitStream.select("success") and splitStream.select("error"), the different
> events are not separated and both select() operations
> (splitStream.select("success") & splitStream.select("error")) return two
> DataStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT]'s and not one
> DataStream[ERROR_EVENT, ERROR_EVENT] and one DataStream[SUCCESS_EVENT].
>
> My suspicion for this bug is that we are attempting to split a
> DataStream<List&lt;TimeseriesEvt>> instead of a DataStream<TimeseriesEvt>,
> but I cannot find a workaround for DataStream<List&lt;TimeseriesEvt>>.
>
> Thanks!!
>
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Split-Streams-not-working-tp14418.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Split Streams not working

Kien Truong
Hi,

I meant adding a select function between the two consecutive select.

Or if you use Flink 1.3, you can use the new side output functionality.

Regards,

Kien


On 7/25/2017 7:54 AM, Kien Truong wrote:

> Hi,
>
> I think you're hitting this bug
>
> https://issues.apache.org/jira/browse/FLINK-5031
>
> Try the workaround mentioned in a bug: add a map function between map
> and select
>
> Regards,
> Kien
>
> On 7/25/2017 3:14 AM, smandrell wrote:
>> Basically, we are not splitting the streams correctly because when we
>> try to
>> select the stream we want from our splitStream (using the select()
>> operation), it never returns a DataStream with just ERROR_EVENT's or a
>> DataStream with just SUCCESS_EVENT's. Instead it returns a DataStream
>> with
>> both ERROR_EVENT's and SUCCESS_EVENT's.
>>
>>
>>
>> I am receiving data by doing the following:
>>
>> return env.fromElements(SUCCESS_EVENT_JSON,
>> SUCCESS_AND_ERROR_EVENT_JSON);
>>
>> SUCCESS_EVENT_JSON will generate one success event once it is sent
>> through
>> our parser. This is not the concern.
>>
>> The concern is the SUCCESS_AND_ERROR_EVENT_JSON. SUCCESS_AND_ERROR_EVENT
>> will generate 3 events once it is sent through our parser: 1 success
>> event
>> and 2 error events.
>>
>>
>> To discern between success events and error events in a given stream,
>> we use
>> the following splitting logic:
>>
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14418/parser.png>
>>
>>
>> This splitting logic works fine when dealing with the stream
>> generated from
>> our parser on the SUCCESS_EVENT_JSON because there is only one event
>> at play
>> here: the success event.
>>
>> However, the splitting logic does not correctly split the stream
>> generated
>> from sending SUCCESS_AND_ERROR_EVENT_JSON through our parser.
>>
>> For some context: when sending SUCCESS_AND_ERROR_EVENT_JSON through our
>> parser, the parser returns a DataStream<List&lt;SuperClassEvent>> in the
>> following form [ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT].
>>
>> As you can see from the above code, we try to separate the ERROR_EVENT's
>> from the SUCCESS_EVENT by doing output.add("success") or
>> output.add("error")
>> but when when we attempt to select the events in our
>> SplitStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT] with
>> splitStream.select("success") and splitStream.select("error"), the
>> different
>> events are not separated and both select() operations
>> (splitStream.select("success") & splitStream.select("error")) return two
>> DataStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT]'s and not one
>> DataStream[ERROR_EVENT, ERROR_EVENT] and one DataStream[SUCCESS_EVENT].
>>
>> My suspicion for this bug is that we are attempting to split a
>> DataStream<List&lt;TimeseriesEvt>> instead of a
>> DataStream<TimeseriesEvt>,
>> but I cannot find a workaround for DataStream<List&lt;TimeseriesEvt>>.
>>
>> Thanks!!
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Split-Streams-not-working-tp14418.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>

Reply | Threaded
Open this post in threaded view
|

Re: Split Streams not working

Aljoscha Krettek
Hi,

In your original program, the problem is that there is both an ERROR and SUCCESS event in your List. Thus you add both "success" and "error" to the list of split outputs. To discern between those different types you first have to flatten that DataStream<List<T>> into a DataStream<T> using a flatMap().

Or, as Kien suggested, you use side outputs, which are the better alternative.

Best,
Aljoscha
 

> On 25. Jul 2017, at 03:02, Kien Truong <[hidden email]> wrote:
>
> Hi,
>
> I meant adding a select function between the two consecutive select.
>
> Or if you use Flink 1.3, you can use the new side output functionality.
>
> Regards,
>
> Kien
>
>
> On 7/25/2017 7:54 AM, Kien Truong wrote:
>> Hi,
>>
>> I think you're hitting this bug
>>
>> https://issues.apache.org/jira/browse/FLINK-5031
>>
>> Try the workaround mentioned in a bug: add a map function between map and select
>>
>> Regards,
>> Kien
>>
>> On 7/25/2017 3:14 AM, smandrell wrote:
>>> Basically, we are not splitting the streams correctly because when we try to
>>> select the stream we want from our splitStream (using the select()
>>> operation), it never returns a DataStream with just ERROR_EVENT's or a
>>> DataStream with just SUCCESS_EVENT's. Instead it returns a DataStream with
>>> both ERROR_EVENT's and SUCCESS_EVENT's.
>>>
>>>
>>>
>>> I am receiving data by doing the following:
>>>
>>> return env.fromElements(SUCCESS_EVENT_JSON, SUCCESS_AND_ERROR_EVENT_JSON);
>>>
>>> SUCCESS_EVENT_JSON will generate one success event once it is sent through
>>> our parser. This is not the concern.
>>>
>>> The concern is the SUCCESS_AND_ERROR_EVENT_JSON. SUCCESS_AND_ERROR_EVENT
>>> will generate 3 events once it is sent through our parser: 1 success event
>>> and 2 error events.
>>>
>>>
>>> To discern between success events and error events in a given stream, we use
>>> the following splitting logic:
>>>
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14418/parser.png>
>>>
>>> This splitting logic works fine when dealing with the stream generated from
>>> our parser on the SUCCESS_EVENT_JSON because there is only one event at play
>>> here: the success event.
>>>
>>> However, the splitting logic does not correctly split the stream generated
>>> from sending SUCCESS_AND_ERROR_EVENT_JSON through our parser.
>>>
>>> For some context: when sending SUCCESS_AND_ERROR_EVENT_JSON through our
>>> parser, the parser returns a DataStream<List&lt;SuperClassEvent>> in the
>>> following form [ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT].
>>>
>>> As you can see from the above code, we try to separate the ERROR_EVENT's
>>> from the SUCCESS_EVENT by doing output.add("success") or output.add("error")
>>> but when when we attempt to select the events in our
>>> SplitStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT] with
>>> splitStream.select("success") and splitStream.select("error"), the different
>>> events are not separated and both select() operations
>>> (splitStream.select("success") & splitStream.select("error")) return two
>>> DataStream[ERROR_EVENT, ERROR_EVENT, SUCCESS_EVENT]'s and not one
>>> DataStream[ERROR_EVENT, ERROR_EVENT] and one DataStream[SUCCESS_EVENT].
>>>
>>> My suspicion for this bug is that we are attempting to split a
>>> DataStream<List&lt;TimeseriesEvt>> instead of a DataStream<TimeseriesEvt>,
>>> but I cannot find a workaround for DataStream<List&lt;TimeseriesEvt>>.
>>>
>>> Thanks!!
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Split-Streams-not-working-tp14418.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>>
>