Filter Null in Array in SQL Connector

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Filter Null in Array in SQL Connector

Rex Fenley
Hi,

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Danny Chan-2
Hi, Fenley ~

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~


Rex Fenley <[hidden email]> 于2020年11月19日周四 下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Danny Chan-2
In reply to this post by Rex Fenley
Can you also share your problematic json string here ? So that we can decide the specific error case cause.

Rex Fenley <[hidden email]> 于2020年11月19日周四 下午2:51写道:
Hi,

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

Thanks!

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)

--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Dylan Forciea
In reply to this post by Danny Chan-2

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Jark Wu-3
Hi Dylan, 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

Hi Rex,

If you can share the json data and the exception stack, that would be helpful!

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

Best,
Jark


On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Dylan Forciea

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Rex Fenley
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Jark Wu-3
Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Danny Chan-2
In reply to this post by Rex Fenley
I checked with the following json:

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"array",
                  "items":{
                     "type":"string",
                     "optional":true
                  },
                  "optional":false,
                  "field":"roles"
               }
            ],
            "optional":true,
            "name":"db.public.data.Value",
            "field":"before"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"int32",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"array",
                  "items":{
                     "type":"string",
                     "optional":true
                  },
                  "optional":false,
                  "field":"roles"
               }
            ],
            "optional":true,
            "name":"db.public.data.Value",
            "field":"after"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"version"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"connector"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"name"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"ts_ms"
               },
               {
                  "type":"string",
                  "optional":true,
                  "name":"io.debezium.data.Enum",
                  "version":1,
                  "parameters":{
                     "allowed":"true,last,false"
                  },
                  "default":"false",
                  "field":"snapshot"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"db"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"schema"
               },
               {
                  "type":"string",
                  "optional":false,
                  "field":"table"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"txId"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"lsn"
               },
               {
                  "type":"int64",
                  "optional":true,
                  "field":"xmin"
               }
            ],
            "optional":false,
            "name":"io.debezium.connector.postgresql.Source",
            "field":"source"
         },
         {
            "type":"string",
            "optional":false,
            "field":"op"
         },
         {
            "type":"int64",
            "optional":true,
            "field":"ts_ms"
         },
         {
            "type":"struct",
            "fields":[
               {
                  "type":"string",
                  "optional":false,
                  "field":"id"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"total_order"
               },
               {
                  "type":"int64",
                  "optional":false,
                  "field":"data_collection_order"
               }
            ],
            "optional":true,
            "field":"transaction"
         }
      ],
      "optional":false,
      "name":"db.public.data.Envelope"
   },
   "payload":{
      "before":null,
      "after":{
         "id":76704,
         "roles":[
            null
         ]
      },
      "source":{
         "version":"1.3.0.Final",
         "connector":"postgresql",
         "name":"db",
         "ts_ms":1605739197360,
         "snapshot":"true",
         "db":"db",
         "schema":"public",
         "table":"data",
         "txId":1784,
         "lsn":1305806608,
         "xmin":null
      },
      "op":"r",
      "ts_ms":1605739197373,
      "transaction":null
   }
}

Which works correctly. I reformatted it because it is with invalid JSON format.

Rex Fenley <[hidden email]> 于2020年11月20日周五 上午3:02写道:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Rex Fenley
In reply to this post by Jark Wu-3
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove all the null values from arrays everything works smoothly and as expected. So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <[hidden email]> wrote:
Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Rex Fenley
Btw, this is what our source and sink essentially look like, with some columns redacted.

CREATE TABLE source_kafka_data (
    id BIGINT,
    roles ARRAY<STRING NOT NULL>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = 'kafka',
    'properties.group.id' = 'group_id',
    'properties.auto.offset.reset' = 'earliest',
    'debezium-json.schema-include' = 'true',
    'format' = 'debezium-json'
)


CREATE TABLE sink_es_data (
    id BIGINT NOT NULL,
    roles ARRAY<STRING>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'eshost',
    'index' = 'data',
    'format' = 'json',
    'sink.bulk-flush.max-actions' = '8192',
    'sink.bulk-flush.max-size' = '16mb',
    'sink.bulk-flush.interval' = '5000',
    'sink.bulk-flush.backoff.delay' = '1000',
    'sink.bulk-flush.backoff.max-retries' = '4',
    'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)



On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <[hidden email]> wrote:
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove all the null values from arrays everything works smoothly and as expected. So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <[hidden email]> wrote:
Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Rex Fenley
Hello,

Any updates on this bug?

Thanks!

On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley <[hidden email]> wrote:
Btw, this is what our source and sink essentially look like, with some columns redacted.

CREATE TABLE source_kafka_data (
    id BIGINT,
    roles ARRAY<STRING NOT NULL>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = 'kafka',
    'properties.group.id' = 'group_id',
    'properties.auto.offset.reset' = 'earliest',
    'debezium-json.schema-include' = 'true',
    'format' = 'debezium-json'
)


CREATE TABLE sink_es_data (
    id BIGINT NOT NULL,
    roles ARRAY<STRING>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'eshost',
    'index' = 'data',
    'format' = 'json',
    'sink.bulk-flush.max-actions' = '8192',
    'sink.bulk-flush.max-size' = '16mb',
    'sink.bulk-flush.interval' = '5000',
    'sink.bulk-flush.backoff.delay' = '1000',
    'sink.bulk-flush.backoff.max-retries' = '4',
    'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)



On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <[hidden email]> wrote:
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove all the null values from arrays everything works smoothly and as expected. So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <[hidden email]> wrote:
Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Danny Chan-2
My local test indicates that the debezium-json works correctly with your given schema and example record, can you give more detailed exception stack trace and a record that can reproduce this problem ?

Rex Fenley <[hidden email]> 于2020年12月1日周二 上午7:15写道:
Hello,

Any updates on this bug?

Thanks!

On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley <[hidden email]> wrote:
Btw, this is what our source and sink essentially look like, with some columns redacted.

CREATE TABLE source_kafka_data (
    id BIGINT,
    roles ARRAY<STRING NOT NULL>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = 'kafka',
    'properties.group.id' = 'group_id',
    'properties.auto.offset.reset' = 'earliest',
    'debezium-json.schema-include' = 'true',
    'format' = 'debezium-json'
)


CREATE TABLE sink_es_data (
    id BIGINT NOT NULL,
    roles ARRAY<STRING>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'eshost',
    'index' = 'data',
    'format' = 'json',
    'sink.bulk-flush.max-actions' = '8192',
    'sink.bulk-flush.max-size' = '16mb',
    'sink.bulk-flush.interval' = '5000',
    'sink.bulk-flush.backoff.delay' = '1000',
    'sink.bulk-flush.backoff.max-retries' = '4',
    'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)



On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <[hidden email]> wrote:
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove all the null values from arrays everything works smoothly and as expected. So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <[hidden email]> wrote:
Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

Reply | Threaded
Open this post in threaded view
|

Re: Filter Null in Array in SQL Connector

Rex Fenley
There's no stack trace, there's literally just the exception logged and it's nonobvious. It looked like flink was just stuck and not processing any data the first time we ran into the problem until we dug deeper. After I get through this next phase of work (1 to 2 weeks) I'll be sure to slice off time to get this bug setup into a project I can share out publicly.

Thanks

On Tue, Dec 1, 2020 at 4:52 AM Danny Chan <[hidden email]> wrote:
My local test indicates that the debezium-json works correctly with your given schema and example record, can you give more detailed exception stack trace and a record that can reproduce this problem ?

Rex Fenley <[hidden email]> 于2020年12月1日周二 上午7:15写道:
Hello,

Any updates on this bug?

Thanks!

On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley <[hidden email]> wrote:
Btw, this is what our source and sink essentially look like, with some columns redacted.

CREATE TABLE source_kafka_data (
    id BIGINT,
    roles ARRAY<STRING NOT NULL>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'topic',
    'properties.bootstrap.servers' = 'kafka',
    'properties.group.id' = 'group_id',
    'properties.auto.offset.reset' = 'earliest',
    'debezium-json.schema-include' = 'true',
    'format' = 'debezium-json'
)


CREATE TABLE sink_es_data (
    id BIGINT NOT NULL,
    roles ARRAY<STRING>,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'elasticsearch-7',
    'hosts' = 'eshost',
    'index' = 'data',
    'format' = 'json',
    'sink.bulk-flush.max-actions' = '8192',
    'sink.bulk-flush.max-size' = '16mb',
    'sink.bulk-flush.interval' = '5000',
    'sink.bulk-flush.backoff.delay' = '1000',
    'sink.bulk-flush.backoff.max-retries' = '4',
    'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)



On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <[hidden email]> wrote:
Thanks!

Update: We've confirmed with a test copy of our data now that if we remove all the null values from arrays everything works smoothly and as expected. So this definitely appears to be the culprit.

On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <[hidden email]> wrote:
Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US