Not able to implement an usecase

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Not able to implement an usecase

jaswin.shah@outlook.com
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:
/**
* Join cart and pg streams on mid and orderId, and the interval specified.
*
* @param leftStream
* @param rightStream
* @return
*/
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin

Reply | Threaded
Open this post in threaded view
|

Re: Not able to implement an usecase

r_khachatryan
Hi Jaswin,

Currently, DataStream API doesn't support outer joins. 
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API [2].


Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <[hidden email]> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:
/**
* Join cart and pg streams on mid and orderId, and the interval specified.
*
* @param leftStream
* @param rightStream
* @return
*/
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin

Reply | Threaded
Open this post in threaded view
|

Re: Not able to implement an usecase

jaswin.shah@outlook.com
If I go with table apis, can I write the streams to hive or it is only for batch processing as of now.


From: Khachatryan Roman <[hidden email]>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Not able to implement an usecase
 
Hi Jaswin,

Currently, DataStream API doesn't support outer joins. 
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API [2].


Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <[hidden email]> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:
/**
* Join cart and pg streams on mid and orderId, and the interval specified.
*
* @param leftStream
* @param rightStream
* @return
*/
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin

Reply | Threaded
Open this post in threaded view
|

Re: Not able to implement an usecase

r_khachatryan
AFAIK, yes, you can write streams.

I'm pulling in Jingsong Li and Rui Li as they might know better. 

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Jaswin Shah <[hidden email]> wrote:
If I go with table apis, can I write the streams to hive or it is only for batch processing as of now.


From: Khachatryan Roman <[hidden email]>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Not able to implement an usecase
 
Hi Jaswin,

Currently, DataStream API doesn't support outer joins. 
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API [2].


Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <[hidden email]> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:
/**
* Join cart and pg streams on mid and orderId, and the interval specified.
*
* @param leftStream
* @param rightStream
* @return
*/
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin

Reply | Threaded
Open this post in threaded view
|

Re: Not able to implement an usecase

Rui Li-2
The hive table sink is only for batch processing in Flink 1.10. There're some on-going efforts to support writing streams to hive and we intend to make it available in 1.11. Stay tuned :)

On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman <[hidden email]> wrote:
AFAIK, yes, you can write streams.

I'm pulling in Jingsong Li and Rui Li as they might know better. 

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Jaswin Shah <[hidden email]> wrote:
If I go with table apis, can I write the streams to hive or it is only for batch processing as of now.


From: Khachatryan Roman <[hidden email]>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Not able to implement an usecase
 
Hi Jaswin,

Currently, DataStream API doesn't support outer joins. 
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API [2].


Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <[hidden email]> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:
/**
* Join cart and pg streams on mid and orderId, and the interval specified.
*
* @param leftStream
* @param rightStream
* @return
*/
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin



--
Cheers,
Rui Li
Reply | Threaded
Open this post in threaded view
|

Re: Not able to implement an usecase

Jingsong Li
In reply to this post by r_khachatryan
Thanks Roman for involving me.

Hi Jaswin,

FLIP-115[1] will finish Kafka -> Hive/Filesystem. And will be released in 1.11.

We will provide two connectors in table:
- file system connector, this connector manage partitions and files by file system paths. You can define a file system table with parquet/orc format, this should be consistent with hive exclude hive metastore support.
- hive connector, this connector manage partitions and files by hive metastore, support automatic adding partition to hive metastore.


Best,
Jingsong Lee

On Tue, May 12, 2020 at 3:52 PM Khachatryan Roman <[hidden email]> wrote:
AFAIK, yes, you can write streams.

I'm pulling in Jingsong Li and Rui Li as they might know better. 

Regards,
Roman


On Mon, May 11, 2020 at 10:21 PM Jaswin Shah <[hidden email]> wrote:
If I go with table apis, can I write the streams to hive or it is only for batch processing as of now.


From: Khachatryan Roman <[hidden email]>
Sent: Tuesday, May 12, 2020 1:49:10 AM
To: Jaswin Shah <[hidden email]>
Cc: [hidden email] <[hidden email]>
Subject: Re: Not able to implement an usecase
 
Hi Jaswin,

Currently, DataStream API doesn't support outer joins. 
As a workaround, you can use coGroup function [1].

Hive is also not supported by DataStream API though it's supported by Table API [2].


Regards,
Roman


On Mon, May 11, 2020 at 6:03 PM Jaswin Shah <[hidden email]> wrote:
Hi,
I want to implement the below use case in my application:
I am doing an interval join between two data streams and then, in process function catching up the discrepant results on joining. Joining is done on key orderId. Now, I want to identify all the messages in both datastreams which are not joined. Means, for a message in left stream if I do not find any message in right stream over the interval defined, then, that message should be caught and same for right stream if there are messages which do not have corresponding messages in left streams then, catch them.Need an help how can I achieve the use case. I know this can be done with outer join but interval join or tumbling event time window joins only support inner join as per my knowledge. I do not want to use table/sql api here but want to work on this datastream apis only.

Currently I am using this which is working for 90 % of the cases but 10 % of the cases where large large delay can happen and messages in left or right streams are missing are not getting supported with my this implementaions:
/**
* Join cart and pg streams on mid and orderId, and the interval specified.
*
* @param leftStream
* @param rightStream
* @return
*/
public SingleOutputStreamOperator<ResultMessage> intervalJoinCartAndPGStreams(DataStream<CartMessage> leftStream, DataStream<PGMessage> rightStream, ParameterTool parameter) {
//Descripant results are sent to kafka from CartPGProcessFunction.
return leftStream
.keyBy(new CartJoinColumnsSelector())
.intervalJoin(rightStream.keyBy(new PGJoinColumnsSelector()))
.between(Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_LOWERBOUND))), Time.milliseconds(Long.parseLong(parameter.get(Constants.INTERVAL_JOIN_UPPERBOUND))))
.process(new CartPGProcessFunction());

}


Secondly, I am unable to find the streaming support to stream out the datastreams I am reading from kafka to hive which I want to batch process with Flink

Please help me on resolving this use cases.

Thanks,
Jaswin



--
Best, Jingsong Lee