JDBCInputFormat does not support json type

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

JDBCInputFormat does not support json type

Fanbin Bu
Hi there,

Flink Version: 1.8.1
JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver

Here is the code snippet:

val rowTypeInfo = new RowTypeInfo(
      Array[TypeInformation[_]](
        new RowTypeInfo(
          Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
          Array[String]("a", "b")
        )
      ),
      Array[String]("fieldName")
    )
val inputFormat = buildInputFormat(query, rowTypeInfo)
env.createInput(inputFormat)

my snowflake table data looks like this (fieldName has type VARIANT) 

fieldName
--------------
{
"a": "1",
"b": "2"
}

I got err msg:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.types.Row


Looks like the record I got from snowflake is a string. The error prevents me from doing something like
sqlQuery("select fieldName.a from table")

Any help is appreciated!

Thanks,
Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: JDBCInputFormat does not support json type

Fanbin Bu
Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR
case VARIANT:
colType = Types.VARCHAR;
extColTypeName = "VARIANT";
break;
and SnowflakeResultSet just return the string of the field
switch(type)
{
case Types.VARCHAR:
case Types.CHAR:
return getString(columnIndex);
What would be the best way to handle this on Flink side?



On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu <[hidden email]> wrote:
Hi there,

Flink Version: 1.8.1
JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver

Here is the code snippet:

val rowTypeInfo = new RowTypeInfo(
      Array[TypeInformation[_]](
        new RowTypeInfo(
          Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
          Array[String]("a", "b")
        )
      ),
      Array[String]("fieldName")
    )
val inputFormat = buildInputFormat(query, rowTypeInfo)
env.createInput(inputFormat)

my snowflake table data looks like this (fieldName has type VARIANT) 

fieldName
--------------
{
"a": "1",
"b": "2"
}

I got err msg:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.types.Row


Looks like the record I got from snowflake is a string. The error prevents me from doing something like
sqlQuery("select fieldName.a from table")

Any help is appreciated!

Thanks,
Fanbin
Reply | Threaded
Open this post in threaded view
|

Re: JDBCInputFormat does not support json type

Fabian Hueske-2
Hi Fanbin,

One approach would be to ingest the field as a VARCHAR / String and implement a Scalar UDF to convert it into a nested tuple.
The UDF could use the code of the flink-json module.

AFAIK, there is some work on the way to add built-in JSON functions.

Best, Fabian

Am Do., 24. Okt. 2019 um 10:03 Uhr schrieb Fanbin Bu <[hidden email]>:
Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR
case VARIANT:
colType = Types.VARCHAR;
extColTypeName = "VARIANT";
break;
and SnowflakeResultSet just return the string of the field
switch(type)
{
case Types.VARCHAR:
case Types.CHAR:
return getString(columnIndex);
What would be the best way to handle this on Flink side?



On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu <[hidden email]> wrote:
Hi there,

Flink Version: 1.8.1
JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver

Here is the code snippet:

val rowTypeInfo = new RowTypeInfo(
      Array[TypeInformation[_]](
        new RowTypeInfo(
          Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO),
          Array[String]("a", "b")
        )
      ),
      Array[String]("fieldName")
    )
val inputFormat = buildInputFormat(query, rowTypeInfo)
env.createInput(inputFormat)

my snowflake table data looks like this (fieldName has type VARIANT) 

fieldName
--------------
{
"a": "1",
"b": "2"
}

I got err msg:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.flink.types.Row


Looks like the record I got from snowflake is a string. The error prevents me from doing something like
sqlQuery("select fieldName.a from table")

Any help is appreciated!

Thanks,
Fanbin