left join flink stream

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

left join flink stream

Youzha
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls. 
Reply | Threaded
Open this post in threaded view
|

Re: left join flink stream

Guowei Ma
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on  your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3].  



On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls. 
Reply | Threaded
Open this post in threaded view
|

Re: left join flink stream

Youzha
Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table.
for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on  your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3].  



On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls. 

Reply | Threaded
Open this post in threaded view
|

Re: left join flink stream

Guowei Ma
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
CREATE TABLE myTopic (
 id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)
2. create the build table from mysql as following. You could find more detailed information from doc[2]
CREATE TABLE MyUserTable (
  id BIGINT,
  sex STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
3. join the tables as following. You could find more detailed information from doc[3]
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <[hidden email]> wrote:
Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table.
for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on  your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3].  



On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls. 

Reply | Threaded
Open this post in threaded view
|

Re: left join flink stream

Youzha
Hi,

i'm using java for do this thing.
and i've success to register the tables.

i've success to select each table.

Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");

but when i try join query i've some error msg like this :

Caused by: org.apache.flink.table.api.TableException: Generic RAW types must have a common type information. at org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)

is there any somethine that i missed here?

On 23/11/20 08:43, Guowei Ma wrote:
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
CREATE TABLE myTopic (
 id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)
2. create the build table from mysql as following. You could find more detailed information from doc[2]
CREATE TABLE MyUserTable (
  id BIGINT,
  sex STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
3. join the tables as following. You could find more detailed information from doc[3]
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <[hidden email]> wrote:
Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table.
for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on  your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3].  



On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls. 


Reply | Threaded
Open this post in threaded view
|

Re: left join flink stream

Guowei Ma
Could you share your code?
Best,
Guowei


On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <[hidden email]> wrote:
Hi,

i'm using java for do this thing.
and i've success to register the tables.

i've success to select each table.

Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");

but when i try join query i've some error msg like this :

Caused by: org.apache.flink.table.api.TableException: Generic RAW types must have a common type information. at org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)

is there any somethine that i missed here?

On 23/11/20 08:43, Guowei Ma wrote:
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
CREATE TABLE myTopic (
 id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)
2. create the build table from mysql as following. You could find more detailed information from doc[2]
CREATE TABLE MyUserTable (
  id BIGINT,
  sex STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
3. join the tables as following. You could find more detailed information from doc[3]
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <[hidden email]> wrote:
Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table.
for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on  your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3].  



On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls. 


Reply | Threaded
Open this post in threaded view
|

Re: left join flink stream

Guowei Ma
Hi, Youzha
Sorry for the late reply. It seems that the type is mis-type-match.
Could you
1. tableA.printSchema to print the schema?
2. KafkaSource.getType() to print the typeinformation?

Best,
Guowei


On Mon, Nov 23, 2020 at 5:28 PM Youzha <[hidden email]> wrote:

Hi, this is sample code :


Table tableA = tEnv.fromDataStream(KafkaSource,"timestamp, id, status");

tEnv.registerTable("tbl_kafka", tableA);

Table result = tEnv.sqlQuery("select * from tbl_kafka where id = 'E02'");


fyi, i’m using avro format on my KafkaSource 


Best Regards,




On Mon, 23 Nov 2020 at 14.44 Guowei Ma <[hidden email]> wrote:
Could you share your code?
Best,
Guowei


On Mon, Nov 23, 2020 at 12:05 PM tkg_cangkul <[hidden email]> wrote:
Hi,

i'm using java for do this thing.
and i've success to register the tables.

i've success to select each table.

Table result1 = tEnv.sqlQuery("select status_code from table_kafka");
Table result2 = tEnv.sqlQuery("select status_code from table_mysql_reff");

but when i try join query i've some error msg like this :

Caused by: org.apache.flink.table.api.TableException: Generic RAW types must have a common type information. at org.apache.flink.table.planner.calcite.FlinkTypeFactory.resolveAllIdenticalTypes(FlinkTypeFactory.scala:381)

is there any somethine that i missed here?

On 23/11/20 08:43, Guowei Ma wrote:
Hi
One way would look like as following
1. create the probe table from Kafka as following. You could find more detailed information from doc[1]
CREATE TABLE myTopic (
 id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'csv',
 'scan.startup.mode' = 'earliest-offset'
)
2. create the build table from mysql as following. You could find more detailed information from doc[2]
CREATE TABLE MyUserTable (
  id BIGINT,
  sex STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);
3. join the tables as following. You could find more detailed information from doc[3]
-- temporal join the JDBC table as a dimension table
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;


On Wed, Nov 18, 2020 at 3:05 PM tkg_cangkul <[hidden email]> wrote:
Hi Guowei Ma,

Thanks for your reply,
In my case.
I've some data on my kafka topic. and i want to get the detail of the data from my reference mysql table.
for example :

in my kafka topic i've this fields :

id, name, position, experience

in my reference mysql table i've this fields:

id, name, age, sex

So , i want to do left join to get the detail data from my reference table.

How can i do this with flink?
Pls advice

On 17/11/20 07:46, Guowei Ma wrote:
Hi, Youzha

In general `CoGroup` is for the window based operation. How it could satisfy your requirements depends on  your specific scenario. But if you want to look at the mysql table as a dimension table. There might be other two ways:
1. Using Table/Sql SDK. You could find a sql example(temporal join the JDBC table as a dimension table) in the table jdbc connector [1] and more join information in the [2]
2. Using DataStream SDK. Maybe you could see whether the `AsycIO` function could satisfy your requirements. You could find the example in [3].  



On Mon, Nov 16, 2020 at 11:20 PM Youzha <[hidden email]> wrote:
Hi i want to do join reference between kafka with mysql table reference. how can i do this thing with flink stream. does coGroup function can handle this ? or anyone have java sample code with this case? i’ve read some article that said if cogroup function can do left outer join. but i’m still confuse to implement it because  i just learned  flink stream. 


need advice pls.