Hello, I'm new to Flink. Thank you for your help.
My application scenario is to process the log through the Flink program, and finally store the log in HBase.
Through Kafka, my Flink application receives log information from other systems. This information can not be immediately sent to HBASE. I first store these logs into the flink table. After new logs are received, the associated logs will be selected from
the link table for calculation. According to the calculation results, they will be stored in HBase or continue to be put into the Flink table.
My problem is that when I use SQL statements to query the data structure from the flink table as org.apache.flink.table.api.Table From Flink In the document, the method I learned is to use the org.apache.flink.util.CloseableIterator The iterator of < row
> loops through each row to obtain the corresponding field by position. But this is too troublesome. Is there any way to directly convert a Table into my business POJO.
In addition, whether there is a way to insert POJO into the link table, I do not seem to see a suitable method.
thanks.
Json
12.13 |
Hi,
first, we should clarify "continue to be put into the Flink table": A Flink Table object does not physically store the data. It is basically a view that contains a transformation pipeline. When you are calling `collect()` the pipeline is executed and all results from the cluster are streamed to one local machine (this might be a bottleneck when processing large data). It might reveal a design issue in your pipeline because ideally all logic should be expressed in Flink SQL or a DataStream API transformations. In general, Flink SQL comes with basic structured type support. A structured type is basically a business POJO. Starting from Flink 1.11, a structured type can be created and passed through UDFs. However, connectors and collect() cannot return them yet. If you really don't want to implement conversion logic yourself, you can also take a look at internal converters: org.apache.flink.table.data.conversion.DataStructureConverters In theory, you can convert from Row -> RowData -> POJO. I hope this helps. Regards, Timo On 13.12.20 06:57, Luo Jason wrote: > Hello, I'm new to Flink. Thank you for your help. > > My application scenario is to process the log through the Flink program, > and finally store the log in HBase. > > > Through Kafka, my Flink application receives log information from other > systems. This information can not be immediately sent to HBASE. I first > store these logs into the flink table. After new logs are received, the > associated logs will be selected from the link table for calculation. > According to the calculation results, they will be stored in HBase or > continue to be put into the Flink table. > > My problem is that when I use SQL statements to query the data structure > from the flink table as org.apache.flink.table.api.Table From Flink In > the document, the method I learned is to use the > org.apache.flink.util.CloseableIterator The iterator of < row > loops > through each row to obtain the corresponding field by position. But this > is too troublesome. Is there any way to directly convert a Table into my > business POJO. > > In addition, whether there is a way to insert POJO into the link table, > I do not seem to see a suitable method. > > thanks. > > Json > 12.13 |
Free forum by Nabble | Edit this page |