Publishing a table to Kafka

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Publishing a table to Kafka

Abhishek Rai
Hello,

I'm using Flink 1.11.2 where I have a SQL backed `Table` which I'm
trying to write to Kafka.  I'm using `KafkaTableSourceSinkFactory`
which ends up instantiating a table sink of type `KafkaTableSink`.
Since this sink is an `AppendStreamTableSink`, I cannot write to it
using a generic table which might produce update/delete records.

What are my options to still write to Kafka?  I don't mind introducing
another boolean/etc field in the Kafka output records containing the
row kind or similar info.

The first workaround that I tried is to convert the table to a
DataStream, but that ran into a second issue as indicated below.  I'm
happy to consider other alternatives, potentially which can be
achieved at Table API level.

When converting to DataStream API, the `table.getSchema().toRowType()`
call below (`TableSchema.toRowType()`) may fail when the underlying
`DataType` is not convertible to a `TypeInformation`, e.g. I get the
following error:

```
TableException: Unsupported conversion from data type 'INTERVAL
SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
information. Only data types that originated from type information
fully support a reverse conversion.
```

Table to DataStream conversion and write to Kafka --
```
var rowType = table.getSchema().toRowType();
var kafkaRecordType = insertFieldAtIndex(
    (RowTypeInfo)rowType, 0, "__row_kind", Types.BOOLEAN);
var outputStream =
    tableEnvironment.toRetractStream(table, rowType).map(
            new PrependChangeTypeToRowMapper(), kafkaRecordType);
var serializationSchema = JsonRowSerializationSchema.builder()
                              .withTypeInfo(kafkaRecordType)
                              .build();
var sink = new FlinkKafkaProducer<>(
    kafkaTopic, serializationSchema, kafkaProperties);
outputStream.addSink(sink).name(sinkName);
```

Thanks,
Abhishek
Reply | Threaded
Open this post in threaded view
|

Re: Publishing a table to Kafka

Leonard Xu
Hi, Rai

What are my options to still write to Kafka?  I don't mind introducing
another boolean/etc field in the Kafka output records containing the
row kind or similar info.

The recommended way is use `upset-kafka`[1] connector which you can write insert/update/retract message to a 
compacted kafka topic and read insert/update/retract messages from this topic as well. It’s a new feature since 1.12, 
there’s no options to control write boolean/etc fields before 1.12 version, because the boolean flag(rowkind) is not exposed to users.
 
The first workaround that I tried is to convert the table to 
```
TableException: Unsupported conversion from data type 'INTERVAL
SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
information. Only data types that originated from type information
fully support a reverse conversion.
```

Your first workaround should have been worked, but looks like an exception was thrown in Type conversion phase, could you share you table schema and query that can reproduce the issue.

Best,
Leonard
Reply | Threaded
Open this post in threaded view
|

Re: Publishing a table to Kafka

Abhishek Rai
Thanks Leonard, we are working towards 1.12 upgrade and should be able
to try upsert-kafka after that.

> Your first workaround should have been worked, but looks like an exception was thrown in Type conversion phase, could you share you table schema and query that can reproduce the issue.

I was able to get past this but ran into another issue which is
detailed further down.  My table schema is:
Table "T0"
- column "C0" (int64) [rowtime attribute]
- column "C1" (string)

Query:
select INTERVAL '1' HOUR as E0, * from T0

In the original code that I posted, the failure happens at:
```
var rowType = table.getSchema().toRowType();
```

I got past this by bridging duration types to long/int before
converting to TypeInformation using
`TypeConversions.fromDataTypeToLegacyInfo`:

```
      var dataType = schema.getFieldDataTypes()[i];
      var typeRoot = dataType.getLogicalType().getTypeRoot();
      if (typeRoot.equals(LogicalTypeRoot.INTERVAL_DAY_TIME)) {
        dataType = dataType.bridgedTo(Long.class);
      }
      if (typeRoot.equals(LogicalTypeRoot.INTERVAL_YEAR_MONTH)) {
        dataType = dataType.bridgedTo(Integer.class);
      }
```

After getting past this, the next problem I've now run into is as
follows.  Like I noted above, I'm converting from Table API to
DataStream API.  We are now seeing the following error at runtime:

```
java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
class java.lang.Long (java.sql.Timestamp is in module java.sql of
loader 'platform'; java.lang.Long is in module java.base of loader
'bootstrap')
```

This error comes from the rowtime attribute column which our custom
table source generates as `java.sql.Timestamp` objects.  That works
out OK when we were applying entirely at table API level.

Our guess is that after conversion to DataStream API, the rowtime
attribute column uses `TimeIndicatorTypeInfo` which is serialized as
long.  Hence the error converting from `Timestamp` to `Long`.

In our case, we would like to continue using the unmodified table
source (generating Timestamp type for the rowtime attribute column),
hence this approach also seems to have hit a dead end.  We are now
planning to try out the upsert-kafka sink following 1.12 upgrade.

Thanks,
Abhishek

On Fri, Jan 15, 2021 at 3:50 AM Leonard Xu <[hidden email]> wrote:

>
> Hi, Rai
>
> What are my options to still write to Kafka?  I don't mind introducing
> another boolean/etc field in the Kafka output records containing the
> row kind or similar info.
>
>
> The recommended way is use `upset-kafka`[1] connector which you can write insert/update/retract message to a
> compacted kafka topic and read insert/update/retract messages from this topic as well. It’s a new feature since 1.12,
> there’s no options to control write boolean/etc fields before 1.12 version, because the boolean flag(rowkind) is not exposed to users.
>
>
> The first workaround that I tried is to convert the table to
> ```
> TableException: Unsupported conversion from data type 'INTERVAL
> SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
> information. Only data types that originated from type information
> fully support a reverse conversion.
> ```
>
>
> Your first workaround should have been worked, but looks like an exception was thrown in Type conversion phase, could you share you table schema and query that can reproduce the issue.
>
> Best,
> Leonard
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html