Need help with event-time aggregations in Flink-1.5 sql-client queries

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help with event-time aggregations in Flink-1.5 sql-client queries

Ashwin Sinha
Hi,

We are trying to aggregate data using flink-1.5 sql client and facing some issues with event-time based aggregations.

JSON data in Kafka:
{"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time": "1530854716000", "number": 851}
{"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time": "1530854720000", "number": 853}
{"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time": "1530854722000", "number": 854}
{"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time": "1530854724000", "number": 855}
{"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time": "1530854726000", "number": 856}
{"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time": "1530854728000", "number": 857}
{"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time": "1530854730000", "number": 858}
{"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time": "1530854732000", "number": 859}
{"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time": "1530854736000", "number": 861}
{"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time": "1530854742000", "number": 864}
{"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time": "1530854746000", "number": 866}
{"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time": "1530854748000", "number": 867}
{"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time": "1530854750000", "number": 868}
{"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time": "1530854752000", "number": 869}
{"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time": "1530854754000", "number": 870}
{"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time": "1530854758000", "number": 872}
{"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time": "1530854760000", "number": 873}
{"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time": "1530854764000", "number": 875}
{"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time": "1530854770000", "number": 878}
{"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time": "1530854772000", "number": 879}
{"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time": "1530854780000", "number": 883}

SQL client config yml: https://pastebin.com/NW9zkStk


SQL Query 1: select * from TimestampTable
Result 1: 
Screen Shot 2018-07-06 at 4.44.17 PM.png


SQL Query 2: 
select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
Result 2: Shell remains empty even after long time-
Screen Shot 2018-07-06 at 4.44.33 PM.png


Problem: We are not able to aggregate on basis of event-time, but when we do it through processing time, we are getting results. Can someone help here?

Also want to know if there is any plan for sinks, like Kafka, for these query results?

--
Ashwin Sinha | Data Engineer
[hidden email] | 9452075361



::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.

Reply | Threaded
Open this post in threaded view
|

Re: Need help with event-time aggregations in Flink-1.5 sql-client queries

Timo Walther
Hi Ashwin,

the Kafka connector does not support emitting watermarks. You have to use a different watermark strategy than `from-source`.

Regards,
Timo


Am 06.07.18 um 13:45 schrieb Ashwin Sinha:
Hi,

We are trying to aggregate data using flink-1.5 sql client and facing some issues with event-time based aggregations.

JSON data in Kafka:
{"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time": "1530854716000", "number": 851}
{"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time": "1530854720000", "number": 853}
{"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time": "1530854722000", "number": 854}
{"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time": "1530854724000", "number": 855}
{"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time": "1530854726000", "number": 856}
{"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time": "1530854728000", "number": 857}
{"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time": "1530854730000", "number": 858}
{"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time": "1530854732000", "number": 859}
{"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time": "1530854736000", "number": 861}
{"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time": "1530854742000", "number": 864}
{"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time": "1530854746000", "number": 866}
{"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time": "1530854748000", "number": 867}
{"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time": "1530854750000", "number": 868}
{"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time": "1530854752000", "number": 869}
{"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time": "1530854754000", "number": 870}
{"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time": "1530854758000", "number": 872}
{"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time": "1530854760000", "number": 873}
{"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time": "1530854764000", "number": 875}
{"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time": "1530854770000", "number": 878}
{"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time": "1530854772000", "number": 879}
{"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time": "1530854780000", "number": 883}

SQL client config yml: https://pastebin.com/NW9zkStk


SQL Query 1: select * from TimestampTable
Result 1: 
Screen Shot 2018-07-06 at 4.44.17 PM.png


SQL Query 2: 
select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
Result 2: Shell remains empty even after long time-
Screen
                Shot 2018-07-06 at 4.44.33 PM.png


Problem: We are not able to aggregate on basis of event-time, but when we do it through processing time, we are getting results. Can someone help here?

Also want to know if there is any plan for sinks, like Kafka, for these query results?

--
Ashwin Sinha | Data Engineer
[hidden email] | 9452075361


::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.


Reply | Threaded
Open this post in threaded view
|

Re: Need help with event-time aggregations in Flink-1.5 sql-client queries

Ashwin Sinha
Ok, thanks Timo!

On Fri 6 Jul, 2018, 17:34 Timo Walther, <[hidden email]> wrote:
Hi Ashwin,

the Kafka connector does not support emitting watermarks. You have to use a different watermark strategy than `from-source`.

Regards,
Timo


Am 06.07.18 um 13:45 schrieb Ashwin Sinha:
Hi,

We are trying to aggregate data using flink-1.5 sql client and facing some issues with event-time based aggregations.

JSON data in Kafka:
{"name": {"lastname": "096LMO", "firstname": "5NPLSR"}, "event_time": "1530854716000", "number": 851}
{"name": {"lastname": "EDLVBG", "firstname": "YRMR1L"}, "event_time": "1530854720000", "number": 853}
{"name": {"lastname": "1Y3JWW", "firstname": "DQ2OY3"}, "event_time": "1530854722000", "number": 854}
{"name": {"lastname": "LD2LA5", "firstname": "24FYOZ"}, "event_time": "1530854724000", "number": 855}
{"name": {"lastname": "UQBFIA", "firstname": "BPOZUW"}, "event_time": "1530854726000", "number": 856}
{"name": {"lastname": "X79NHY", "firstname": "HWLDDH"}, "event_time": "1530854728000", "number": 857}
{"name": {"lastname": "PPSTQ7", "firstname": "1DSQZ0"}, "event_time": "1530854730000", "number": 858}
{"name": {"lastname": "8EGA1G", "firstname": "X5UB73"}, "event_time": "1530854732000", "number": 859}
{"name": {"lastname": "4QMUG0", "firstname": "ZJKQLH"}, "event_time": "1530854736000", "number": 861}
{"name": {"lastname": "H4W4DW", "firstname": "ZLDWI9"}, "event_time": "1530854742000", "number": 864}
{"name": {"lastname": "1U9OQN", "firstname": "QYA32C"}, "event_time": "1530854746000", "number": 866}
{"name": {"lastname": "0WRROS", "firstname": "TXDLXK"}, "event_time": "1530854748000", "number": 867}
{"name": {"lastname": "4FWB78", "firstname": "02GXX5"}, "event_time": "1530854750000", "number": 868}
{"name": {"lastname": "R2YUDY", "firstname": "6ME9YZ"}, "event_time": "1530854752000", "number": 869}
{"name": {"lastname": "G7UD1M", "firstname": "62WJEE"}, "event_time": "1530854754000", "number": 870}
{"name": {"lastname": "GHH1BI", "firstname": "93CAZE"}, "event_time": "1530854758000", "number": 872}
{"name": {"lastname": "IR1WOO", "firstname": "YYF7F8"}, "event_time": "1530854760000", "number": 873}
{"name": {"lastname": "ODES6S", "firstname": "ELH4VH"}, "event_time": "1530854764000", "number": 875}
{"name": {"lastname": "658D5P", "firstname": "F1HRY5"}, "event_time": "1530854770000", "number": 878}
{"name": {"lastname": "1ZZ340", "firstname": "GTP0PW"}, "event_time": "1530854772000", "number": 879}
{"name": {"lastname": "6J2B39", "firstname": "DDZTC5"}, "event_time": "1530854780000", "number": 883}

SQL client config yml: https://pastebin.com/NW9zkStk


SQL Query 1: select * from TimestampTable
Result 1: 



SQL Query 2: 
select HOP_START(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) ,HOP_END(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), count(*) from TimestampTable where number > 10 group by HOP(rowTime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)
Result 2: Shell remains empty even after long time-



Problem: We are not able to aggregate on basis of event-time, but when we do it through processing time, we are getting results. Can someone help here?

Also want to know if there is any plan for sinks, like Kafka, for these query results?

--
Ashwin Sinha | Data Engineer
[hidden email] | 9452075361


::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.



::DISCLAIMER::

----------------------------------------------------------------------------------------------------------------------------------------------------


This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system.


Screen Shot 2018-07-06 at 4.44.17 PM.png (374K) Download Attachment