Re: Filter Null in Array in SQL Connector

Posted by Jark Wu-3 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Filter-Null-in-Array-in-SQL-Connector-tp39607p39649.html

Thanks Rex! This is very helpful. Will check it out later. 


On Fri, 20 Nov 2020 at 03:02, Rex Fenley <[hidden email]> wrote:
Below is a highly redacted set of data that should represent the problem. As you can see, the "roles" field has "[null]" in it, a null value within the array. We also see in our DB corresponding rows like the following.
    id     | roles  
-----------+------------
  16867433 | {NULL}

We have confirmed that by not selecting "roles" all data passes through without failure on a single operator, but selecting "roles" will eventually always fail with java.lang.NullPointerException repeatedly. What is odd about this is there is 0 additional stack trace, just the exception, in our logs and in Flink UI. We only have INFO logging on, however, other exceptions we've encountered in our development have always revealed a stack trace.

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "int32", "optional": false, "field": "id" },
          {
            "type": "array",
            "items": { "type": "string", "optional": true },
            "optional": false,
            "field": "roles"
          },
        ],
        "optional": true,
        "name": "db.public.data.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "version" },
          { "type": "string", "optional": false, "field": "connector" },
          { "type": "string", "optional": false, "field": "name" },
          { "type": "int64", "optional": false, "field": "ts_ms" },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": { "allowed": "true,last,false" },
            "default": "false",
            "field": "snapshot"
          },
          { "type": "string", "optional": false, "field": "db" },
          { "type": "string", "optional": false, "field": "schema" },
          { "type": "string", "optional": false, "field": "table" },
          { "type": "int64", "optional": true, "field": "txId" },
          { "type": "int64", "optional": true, "field": "lsn" },
          { "type": "int64", "optional": true, "field": "xmin" }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      { "type": "string", "optional": false, "field": "op" },
      { "type": "int64", "optional": true, "field": "ts_ms" },
      {
        "type": "struct",
        "fields": [
          { "type": "string", "optional": false, "field": "id" },
          { "type": "int64", "optional": false, "field": "total_order" },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.public.data.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 76704,
      "roles": [null],
    },
    "source": {
      "version": "1.3.0.Final",
      "connector": "postgresql",
      "name": "db",
      "ts_ms": 1605739197360,
      "snapshot": "true",
      "db": "db",
      "schema": "public",
      "table": "data",
      "txId": 1784,
      "lsn": 1305806608,
      "xmin": null
    },
    "op": "r",
    "ts_ms": 1605739197373,
    "transaction": null
  }
}

cc Brad

On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <[hidden email]> wrote:

Ah yes, missed the kafka part and just saw the array part. FLINK-19771 definitely was solely in the postgres-specific code.

 

Dylan

 

From: Jark Wu <[hidden email]>
Date: Thursday, November 19, 2020 at 9:12 AM
To: Dylan Forciea <[hidden email]>
Cc: Danny Chan <[hidden email]>, Rex Fenley <[hidden email]>, Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi Dylan, 

 

I think Rex encountered another issue, because he is using Kafka with Debezium format. 

 

Hi Rex,

 

If you can share the json data and the exception stack, that would be helpful!

 

Besides, you can try to enable 'debezium-json.ignore-parse-errors' option [1] to skip the dirty data.

 

Best,

Jark

 

 

On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <[hidden email]> wrote:

Do you mean that the array contains values that are null, or that the entire array itself is null? If it’s the latter, I have an issue written, along with a PR to fix it that has been pending review [1].

 

Regards,

Dylan Forciea

 

[1] https://issues.apache.org/jira/browse/FLINK-19771

 

From: Danny Chan <[hidden email]>
Date: Thursday, November 19, 2020 at 2:24 AM
To: Rex Fenley <[hidden email]>
Cc: Flink ML <[hidden email]>
Subject: Re: Filter Null in Array in SQL Connector

 

Hi, Fenley ~

 

You are right, parsing nulls of ARRAY field is not supported now, i have logged an issue [1] and would fix it soon ~

 

 

Rex Fenley <[hidden email]> 20201119日周四 下午2:51写道:

Hi,

 

I recently discovered some of our data has NULL values arriving in an ARRAY<STRING> column. This column is being consumed by Flink via the Kafka connector Debezium format. We seem to be receiving NullPointerExceptions for when these NULL values in the arrays arrive which restarts the source operator in a loop.

 

Is there any way to not throw or to possibly filter out NULLs in an Array of Strings in Flink?

 

We're somewhat stuck on how to solve this problem, we'd like to be defensive about this on Flink's side.

 

Thanks!

 

(P.S. The exception was not that informative, there may be room for improvement in terms of a richer error message when this happens.)


--

Rex Fenley  |  Software Engineer - Mobile and Backend

 

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US



--

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com |  BLOG  |  FOLLOW US  |  LIKE US