Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

Luo Jason
Hello, I'm new to Flink. Thank you for your help.

My application scenario is to process the log through the Flink program, and finally store the log in HBase.


Through Kafka, my Flink application receives log information from other systems. This information can not be immediately sent to  HBASE. I first store these logs into the flink table. After new logs are received, the associated logs will be selected from the link table for calculation. According to the calculation results, they will be stored in HBase or continue to be put into the Flink table.

My problem is that when I use SQL statements to query the data structure from the flink table as org.apache.flink.table.api.Table From Flink In the document, the method I learned is to use the org.apache.flink.util.CloseableIterator The iterator of < row > loops through each row to obtain the corresponding field by position. But this is too troublesome. Is there any way to directly convert a Table into my business POJO.

In addition, whether there is a way to insert POJO into the link table, I do not seem to see a suitable method.

thanks.

Json
12.13
Reply | Threaded
Open this post in threaded view
|

Re: Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

Timo Walther
Hi,

first, we should clarify "continue to be put into the Flink table": A
Flink Table object does not physically store the data. It is basically a
view that contains a transformation pipeline.

When you are calling `collect()` the pipeline is executed and all
results from the cluster are streamed to one local machine (this might
be a bottleneck when processing large data). It might reveal a design
issue in your pipeline because ideally all logic should be expressed in
Flink SQL or a DataStream API transformations.

In general, Flink SQL comes with basic structured type support. A
structured type is basically a business POJO. Starting from Flink 1.11,
a structured type can be created and passed through UDFs. However,
connectors and collect() cannot return them yet. If you really don't
want to implement conversion logic yourself, you can also take a look at
internal converters:
org.apache.flink.table.data.conversion.DataStructureConverters
In theory, you can convert from Row -> RowData -> POJO.

I hope this helps.

Regards,
Timo


On 13.12.20 06:57, Luo Jason wrote:

> Hello, I'm new to Flink. Thank you for your help.
>
> My application scenario is to process the log through the Flink program,
> and finally store the log in HBase.
>
>
> Through Kafka, my Flink application receives log information from other
> systems. This information can not be immediately sent to  HBASE. I first
> store these logs into the flink table. After new logs are received, the
> associated logs will be selected from the link table for calculation.
> According to the calculation results, they will be stored in HBase or
> continue to be put into the Flink table.
>
> My problem is that when I use SQL statements to query the data structure
> from the flink table as org.apache.flink.table.api.Table From Flink In
> the document, the method I learned is to use the
> org.apache.flink.util.CloseableIterator The iterator of < row > loops
> through each row to obtain the corresponding field by position. But this
> is too troublesome. Is there any way to directly convert a Table into my
> business POJO.
>
> In addition, whether there is a way to insert POJO into the link table,
> I do not seem to see a suitable method.
>
> thanks.
>
> Json
> 12.13