Hi, I'm trying to set up Flink with Debezium CDC Connector on AWS EMR, however, EMR only supports Flink 1.10.0, whereas Debezium Connector arrived in Flink 1.11.0, from looking at the documentation. I'm wondering what alternative solutions are available for connecting Debezium to Flink? Is there an open source Debezium connector that works with Flink 1.10.0? Could I potentially pull the code out for the 1.11.0 Debezium connector and compile it in my project using Flink 1.10.0 api? For context, I plan on doing some fairly complicated long lived stateful joins / materialization using the Table API over data ingested from Postgres and possibly MySQL. Appreciate any help, thanks! |
@Jark Would it be possible to use the
1.11 debezium support in 1.10?
On 20/08/2020 19:59, Rex Fenley wrote:
|
Hi, Rex. Part of what enabled CDC support in Flink 1.11 was the refactoring of the table source interfaces (FLIP-95 [1]), and the new ScanTableSource [2], which allows to emit bounded/unbounded streams with insert, update and delete rows. In theory, you could consume data generated with Debezium as regular JSON-encoded events before Flink 1.11 — there just wasn't a convenient way to really treat it as "changelog". As a workaround, what you can do in Flink 1.10 is process these messages as JSON and extract the "after" field from the payload, and then apply de-duplication [3] to keep only the last row. The DDL for your source table would look something like: CREATE TABLE tablename ( ... after ROW(`field1` DATATYPE, `field2` DATATYPE, ...) ) WITH ( 'connector' = 'kafka', 'format' = 'json', ... ); Hope this helps!Marta [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/api/java/org/apache/flink/table/connector/source/ScanTableSource.html [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#deduplication On Fri, Aug 21, 2020 at 10:28 AM Chesnay Schepler <[hidden email]> wrote:
|
Yup! This definitely helps and makes sense. The 'after' payload comes with all data from the row right? So essentially inserts and updates I can insert/replace data by pk and null values I just delete by pk, and then I can build out the rest of my joins like normal. Are there any performance implications of doing it this way that is different from the out-of-the-box 1.11 solution? On Fri, Aug 21, 2020 at 2:28 AM Marta Paes Moreira <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Yes — you'll get the full row in the payload; and you can also access the change operation, which might be useful in your case. About performance, I'm summoning Kurt and [hidden email] to the thread, who will be able to give you a more complete answer and likely also some optimization tips for your specific use case. Marta On Fri, Aug 21, 2020 at 8:55 PM Rex Fenley <[hidden email]> wrote:
|
Thank you so much for the help! On Mon, Aug 24, 2020 at 4:08 AM Marta Paes Moreira <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi, Regarding the performance difference, the proposed way will have one more stateful operator (deduplication) than the native 1.11 cdc support. The overhead of the deduplication operator is just similar to a simple group by aggregate (max on each non-key column). Best, Jark On Tue, 25 Aug 2020 at 02:21, Rex Fenley <[hidden email]> wrote:
|
Thanks! On Thu, Aug 27, 2020 at 5:33 AM Jark Wu <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi again! I'm tested out locally in docker on Flink 1.11 first to get my bearings before downgrading to 1.10 and figuring out how to replace the Debezium connector. However, I'm getting the following error ``` Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. ``` Any suggestions for me to fix this? code: val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment val blinkStreamSettings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(bsEnv, blinkStreamSettings) // Table from Debezium mysql example docker: // +-------------+-------------------------------------+------+-----+---------+----------------+ // | Field | Type | Null | Key | Default | Extra | // +-------------+-------------------------------------+------+-----+---------+----------------+ // | id | int(11) | NO | PRI | NULL | auto_increment | // | customer_id | int(11) | NO | MUL | NULL | | // | street | varchar(255) | NO | | NULL | | // | city | varchar(255) | NO | | NULL | | // | state | varchar(255) | NO | | NULL | | // | zip | varchar(255) | NO | | NULL | | // | type | enum('SHIPPING','BILLING','LIVING') | NO | | NULL | | // +-------------+-------------------------------------+------+-----+---------+----------------+ tableEnv.executeSql(""" CREATE TABLE topic_addresses ( -- schema is totally the same to the MySQL "addresses" table id INT, customer_id INT, street STRING, city STRING, state STRING, zip STRING, type STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.inventory.addresses', 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', 'properties.group.id' = 'testGroup', 'format' = 'debezium-json' -- using debezium-json as the format ) """) val table = tableEnv.from("topic_addresses").select($"*") // Defining a PK automatically puts it in Upsert mode, which we want. // TODO: type should be a keyword, is that acceptable by the DDL? tableEnv.executeSql(""" CREATE TABLE ESAddresses ( id INT, customer_id INT, street STRING, city STRING, state STRING, zip STRING, type STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://flink-jdbc-test_graph-elasticsearch_1:9200', 'index' = 'flinkaddresses', 'format' = 'json' ) """) table.executeInsert("ESAddresses").print() Thanks! On Thu, Aug 27, 2020 at 11:53 AM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi, This is a known issue in 1.11.0, and has been fixed in 1.11.1. Best, Jark On Fri, 28 Aug 2020 at 06:52, Rex Fenley <[hidden email]> wrote:
|
Awesome, so that took me a step further. When running i'm receiving an error however. FYI, my docker-compose file is based on the Debezium mysql tutorial which can be found here https://debezium.io/documentation/reference/1.2/tutorial.html Part of the stack trace: flink-jobmanager_1 | Caused by: java.io.IOException: Corrupt Debezium JSON message '{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":false,"field":"customer_id"},{"type":"string","optional":false,"field":"street"},{"type":"string","optional":false,"field":"city"},{"type":"string","optional":false,"field":"state"},{"type":"string","optional":false,"field":"zip"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"SHIPPING,BILLING,LIVING"},"field":"type"}],"optional":true,"name":"dbserver1.inventory.addresses.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.addresses.Envelope"},"payload":{"before":null,"after":{"id":18,"customer_id":1004,"street":"111 cool street","city":"Big City","state":"California","zip":"90000","type":"BILLING"},"source":{"version":"1.2.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1598651432000,"snapshot":"false","db":"inventory","table":"addresses","server_id":223344,"gtid":null,"file":"mysql-bin.000010","pos":369,"row":0,"thread":5,"query":null},"op":"c","ts_ms":1598651432407,"transaction":null}}'. flink-jobmanager_1 | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | Caused by: java.lang.NullPointerException flink-jobmanager_1 | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:115) ~[flink-json-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1] On Thu, Aug 27, 2020 at 8:12 PM Jark Wu <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, the connector expects a value without a schema, but the message contains a schema. You can tell Flink that the schema is included as written in the documentation [1].
[hidden email] , it would be probably good to make the connector more robust and catch these types of misconfigurations. On Fri, Aug 28, 2020 at 11:56 PM Rex Fenley <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks for the input, though I've certainly included a schema as is reflected earlier in this thread. Including here again ... tableEnv.executeSql(""" CREATE TABLE topic_addresses ( -- schema is totally the same to the MySQL "addresses" table id INT, customer_id INT, street STRING, city STRING, state STRING, zip STRING, type STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'topic' = 'dbserver1.inventory.addresses', 'properties.bootstrap.servers' = 'flink-jdbc-test_kafka_1:9092', 'properties.group.id' = 'testGroup', 'format' = 'debezium-json' -- using debezium-json as the format ) """) val table = tableEnv.from("topic_addresses").select($"*") ... On Mon, Aug 31, 2020 at 2:39 AM Arvid Heise <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi Rex, you still forgot
On Mon, Aug 31, 2020 at 7:55 PM Rex Fenley <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Ah, my bad, thanks for pointing that out Arvid! On Mon, Aug 31, 2020 at 12:00 PM Arvid Heise <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hi, getting so close but ran into another issue: Flink successfully reads changes from Debezium/Kafka and writes them to Elasticsearch, but there's a problem with deletions. When I DELETE a row from MySQL the deletion makes it successfully all the way to Elasticsearch which is great, but then the taskmanager suddenly dies with a null pointer exception. Inserts and Updates do not have the same problem. This seems very odd. Any help would be much appreciated. Thanks! flink-taskmanager_1 | 2020-08-31 23:30:33,684 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED. flink-taskmanager_1 | java.lang.NullPointerException: null flink-taskmanager_1 | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265] flink-taskmanager_1 | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[flink-jdbc-test-assembly-0.1-SNAPSHOT.jar:0.1-SNAPSHOT] flink-taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-taskmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-taskmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-taskmanager_1 | 2020-08-31 23:30:33,720 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8). flink-taskmanager_1 | 2020-08-31 23:30:33,728 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) 2b79917cb528f37fad7f636740d2fdd8. flink-jobmanager_1 | 2020-08-31 23:30:33,770 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: TableSourceScan(table=[[default_catalog, default_database, topic_addresses]], fields=[id, customer_id, street, city, state, zip, type]) -> Sink: Sink(table=[default_catalog.default_database.ESAddresses], fields=[id, customer_id, street, city, state, zip, type]) (1/2) (2b79917cb528f37fad7f636740d2fdd8) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@2e246b35. flink-jobmanager_1 | java.lang.NullPointerException: null flink-jobmanager_1 | at java.lang.String.<init>(String.java:566) ~[?:1.8.0_265] flink-jobmanager_1 | at org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136) ~[flink-json-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) ~[?:?] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.1.jar:1.11.1] flink-jobmanager_1 | at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) ~[flink-dist_2.12-1.11.1.jar:1.11.1] On Mon, Aug 31, 2020 at 12:27 PM Rex Fenley <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Hey, Rex! This is likely due to the tombstone records that Debezium produces for DELETE operations (i.e. a record with the same key as the deleted row and a value of null). These are markers for Kafka to indicate that log compaction can remove all records for the given key, and the initial implementation of the debezium-format can't handle them. This issue is already documented (and solved) in [1]. In the meantime, can you try adding "tombstones.on.delete":false" to the configuration of your Debezium MySQL connector? Marta [1] https://issues.apache.org/jira/browse/FLINK-18705 On Tue, Sep 1, 2020 at 1:36 AM Rex Fenley <[hidden email]> wrote:
|
This worked, thanks! Looking forward to the future releases :) On Mon, Aug 31, 2020 at 5:06 PM Marta Paes Moreira <[hidden email]> wrote:
-- Rex Fenley | Software Engineer - Mobile and Backend Remind.com | BLOG | FOLLOW US | LIKE US |
Free forum by Nabble | Edit this page |