Debezium Flink EMR

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Debezium Flink EMR

Rex Fenley
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Chesnay Schepler
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Marta Paes Moreira
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US


Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Marta Paes Moreira
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Jark Wu-3
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Jark Wu-3
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Arvid Heise-3
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Arvid Heise-3
Hi Rex,

you still forgot
'debezium-json.schema-include' = true

Please reread my mail.

On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <[hidden email]> wrote:
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <[hidden email]> wrote:
Hi Rex,

you still forgot
'debezium-json.schema-include' = true

Please reread my mail.

On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <[hidden email]> wrote:
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to Elasticsearch, but there's a problem with deletions. When I DELETE a row from MySQL the deletion makes it successfully all the way to Elasticsearch which is great, but then the taskmanager suddenly dies with a null pointer exception. Inserts and Updates do not have the same problem. This seems very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1    | java.lang.NullPointerException: null
flink-taskmanager_1    | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265]
flink-taskmanager_1    | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) 2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1     | java.lang.NullPointerException: null
flink-jobmanager_1     | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265]
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <[hidden email]> wrote:
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <[hidden email]> wrote:
Hi Rex,

you still forgot
'debezium-json.schema-include' = true

Please reread my mail.

On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <[hidden email]> wrote:
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Marta Paes Moreira
Hey, Rex!

This is likely due to the tombstone records that Debezium produces for DELETE operations (i.e. a record with the same key as the deleted row and a value of null). These are markers for Kafka to indicate that log compaction can remove all records for the given key, and the initial implementation of the debezium-format can't handle them. This issue is already documented (and solved) in [1].

In the meantime, can you try adding "tombstones.on.delete":false" to the configuration of your Debezium MySQL connector? Marta
[1] https://issues.apache.org/jira/browse/FLINK-18705

On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <[hidden email]> wrote:
Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to Elasticsearch, but there's a problem with deletions. When I DELETE a row from MySQL the deletion makes it successfully all the way to Elasticsearch which is great, but then the taskmanager suddenly dies with a null pointer exception. Inserts and Updates do not have the same problem. This seems very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1    | java.lang.NullPointerException: null
flink-taskmanager_1    | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265]
flink-taskmanager_1    | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) 2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1     | java.lang.NullPointerException: null
flink-jobmanager_1     | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265]
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <[hidden email]> wrote:
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <[hidden email]> wrote:
Hi Rex,

you still forgot
'debezium-json.schema-include' = true

Please reread my mail.

On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <[hidden email]> wrote:
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Debezium Flink EMR

Rex Fenley
This worked, thanks! Looking forward to the future releases :)

On Mon, Aug 31, 2020 at 5:06 PM Marta Paes Moreira <[hidden email]> wrote:
Hey, Rex!

This is likely due to the tombstone records that Debezium produces for DELETE operations (i.e. a record with the same key as the deleted row and a value of null). These are markers for Kafka to indicate that log compaction can remove all records for the given key, and the initial implementation of the debezium-format can't handle them. This issue is already documented (and solved) in [1].

In the meantime, can you try adding "tombstones.on.delete":false" to the configuration of your Debezium MySQL connector? Marta
[1] https://issues.apache.org/jira/browse/FLINK-18705

On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <[hidden email]> wrote:
Hi, getting so close but ran into another issue:

Flink successfully reads changes from Debezium/Kafka and writes them to Elasticsearch, but there's a problem with deletions. When I DELETE a row from MySQL the deletion makes it successfully all the way to Elasticsearch which is great, but then the taskmanager suddenly dies with a null pointer exception. Inserts and Updates do not have the same problem. This seems very odd. Any help would be much appreciated. Thanks!

flink-taskmanager_1    | 2020-08-31 23:30:33,684 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED.
flink-taskmanager_1    | java.lang.NullPointerException: null
flink-taskmanager_1    | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265]
flink-taskmanager_1    | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT]
flink-taskmanager_1    | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-taskmanager_1    | 2020-08-31 23:30:33,720 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8).
flink-taskmanager_1    | 2020-08-31 23:30:33,728 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) 2b79917cb528f37fad7f636740d2fdd8.
flink-jobmanager_1     | 2020-08-31 23:30:33,770 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35.
flink-jobmanager_1     | java.lang.NullPointerException: null
flink-jobmanager_1     | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265]
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <[hidden email]> wrote:
Ah, my bad, thanks for pointing that out Arvid!

On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <[hidden email]> wrote:
Hi Rex,

you still forgot
'debezium-json.schema-include' = true

Please reread my mail.

On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <[hidden email]> wrote:
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again
...
tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")
...

On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
Hi Rex,

the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].

CREATE TABLE topic_products (
  -- schema is totally the same to the MySQL "products" table
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
 'format' = 'debezium-json',
'debezium-json.schema-include' = true
)

[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations.


On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html

Part of the stack trace:

flink-jobmanager_1     | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'.
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | Caused by: java.lang.NullPointerException
flink-jobmanager_1     | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1]
flink-jobmanager_1     | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1]

On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
Hi,

This is a known issue in 1.11.0, and has been fixed in 1.11.1. 


Best,
Jark

On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
Hi again!

I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error
```
Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
```

Any suggestions for me to fix this?

code:

val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val blinkStreamSettings =
EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings)

// Table from Debezium mysql example docker:
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | Field | Type | Null | Key | Default | Extra |
// +-------------+-------------------------------------+------+-----+---------+----------------+
// | id | int(11) | NO | PRI | NULL | auto_increment |
// | customer_id | int(11) | NO | MUL | NULL | |
// | street | varchar(255) | NO | | NULL | |
// | city | varchar(255) | NO | | NULL | |
// | state | varchar(255) | NO | | NULL | |
// | zip | varchar(255) | NO | | NULL | |
// | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | |
// +-------------+-------------------------------------+------+-----+---------+----------------+

tableEnv.executeSql("""
CREATE TABLE topic_addresses (
-- schema is totally the same to the MySQL "addresses" table
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.addresses',
'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092',
'properties.group.id' = 'testGroup',
'format' = 'debezium-json' -- using debezium-json as the format
)
""")

val table = tableEnv.from("topic_addresses").select($"*")

// Defining a PK automatically puts it in Upsert mode, which we want.
// TODO: type should be a keyword, is that acceptable by the DDL?
tableEnv.executeSql("""
CREATE TABLE ESAddresses (
id INT,
customer_id INT,
street STRING,
city STRING,
state STRING,
zip STRING,
type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'index' = 'flinkaddresses',
'format' = 'json'
)
""")

table.executeInsert("ESAddresses").print()

Thanks!

On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
Thanks!

On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
Hi,

Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. 
The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). 

Best,
Jark

On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
Thank you so much for the help!

On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case.

About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case.

Marta

On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
Yup! This definitely helps and makes sense.

The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal.

Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution?

On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
Hi, Rex.

Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows.

In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row.

The DDL for your source table would look something like:

CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... );

Hope this helps!

Marta

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication 

On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
@Jark Would it be possible to use the 1.11 debezium support in 1.10?

On 20/08/2020 19:59, Rex Fenley wrote:
Hi,

I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation.


I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api?

For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL.

Appreciate any help, thanks!

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US




--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US