Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because i just learned flink stream. need advice pls.
|
Hi, Youzha In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways: 1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2] 2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3]. On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
|
Hi Guowei Ma,
Thanks for your reply, In my case. I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table. for example : in my kafka topic i've this fields : id, name, position, experience in my reference mysql table i've this fields: id, name, age, sex So , i want to do left join to get the detail data from my reference table. How can i do this with flink? Pls advice On 17/11/20 07:46, Guowei Ma wrote:
|
Hi One way would look like as following 1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
2. create the build table from mysql as following. You could find more detailed information from doc[2]
3. join the tables as following. You could find more detailed information from doc[3]
On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <[hidden email]> wrote:
|
Hi,
i'm using java for do this thing. and i've success to register the tables. i've success to select each table. Table result1 = tEnv.sqlQuery("select status_code from table_kafka"); Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff"); but when i try join query i've some error msg like this : Caused by: org.apache.flink.table.api.TableException: Generic RAW types must have a common type information. at org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381) is there any somethine that i missed here? On 23/11/20 08:43, Guowei Ma wrote:
|
Could you share your code? Best, Guowei On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <[hidden email]> wrote:
|
Hi, Youzha Sorry for the late reply. It seems that the type is mis-type-match. Could you 1. tableA.printSchema to print the schema? 2. KafkaSource.getType() to print the typeinformation? Best, Guowei On Mon, Nov 23, 2020 at 5:28 PM Youzha <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |