How to register TableSinks

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to register TableSinks

Eifler, Patrick

Hi and Happy New Year,

 

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 1.12. currently running on 1.11.

 

Specifically I need to update our code that registers table sinks into the StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple sinks. Now I want to use the StatementSet to benefit from its DAG for multiple sinks.

 

So far I added the code to add the sinks into the StatementSet:

 

statementSet.addInsert(sinkName,.table)

 

and to execute the StatementSet:

 

statementSet.execute()

 

For this to work I need to register the sinks. I used to do that with the (now deprecated) function on the StreamTableEnvironment:

 

tableEnv.registerTableSink(

        sinkName,

        fieldNames,

        fieldTypes,

        tableSink

)

 

My Question is how to register sinks to be discovered by the statement set? What is the proper replacement for the function registerTableSink?

 

executeSql(ddl) as suggested, does not apply to this use case. Did not find anything in the documentation either: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

 

When running the job I’m getting the error, that the sink could not be found in the catalog. Which means I have to add the sink into the catalog, but how?

 

Which function should be used for registering a table sink into the table environments catalog?

Thanks!

 

Kind Regards,

 

Patrick

-- 

Patrick Eifler

 

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure 
Sony Interactive Entertainment LLC 

Wilhelmstraße 118, 10963 Berlin


Germany

E: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to register TableSinks

Dawid Wysakowicz-2

Hi Patrick.

Happy New Year to you too ;)

The method you referring was deprecated along with the TableSink whatsoever in favour of a much improved and feature rich new Source & Sink API. You can find an extensive documentation on this new API here[1].

Therefore if you use the old TableSink interface you must stick with the deprecated method.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On 04/01/2021 14:53, [hidden email] wrote:

Hi and Happy New Year,

 

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 1.12. currently running on 1.11.

 

Specifically I need to update our code that registers table sinks into the StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple sinks. Now I want to use the StatementSet to benefit from its DAG for multiple sinks.

 

So far I added the code to add the sinks into the StatementSet:

 

statementSet.addInsert(sinkName,.table)

 

and to execute the StatementSet:

 

statementSet.execute()

 

For this to work I need to register the sinks. I used to do that with the (now deprecated) function on the StreamTableEnvironment:

 

tableEnv.registerTableSink(

        sinkName,

        fieldNames,

        fieldTypes,

        tableSink

)

 

My Question is how to register sinks to be discovered by the statement set? What is the proper replacement for the function registerTableSink?

 

executeSql(ddl) as suggested, does not apply to this use case. Did not find anything in the documentation either: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

 

When running the job I’m getting the error, that the sink could not be found in the catalog. Which means I have to add the sink into the catalog, but how?

 

Which function should be used for registering a table sink into the table environments catalog?

Thanks!

 

Kind Regards,

 

Patrick

-- 

Patrick Eifler

 

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure 
Sony Interactive Entertainment LLC 

Wilhelmstraße 118, 10963 Berlin


Germany

E: [hidden email]


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to register TableSinks

Eifler, Patrick

Hey, Thanks Dawid,

 

One more question: Does the StatementSet API supposed to work with the old sink interface?

I get the following error when I’m using it with the deprecated registerTableSink method:

 

The main method caused an error: requirement failed: operations should not be empty

 

Thanks!

 

Patrick

-- 

Patrick Eifler

 

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure 
Sony Interactive Entertainment LLC 

Wilhelmstraße 118, 10963 Berlin


Germany

E: [hidden email]

 

From: Dawid Wysakowicz <[hidden email]>
Date: Monday, 4. January 2021 at 15:50
To: <[hidden email]>, <[hidden email]>
Subject: Re: How to register TableSinks

 

Hi Patrick.

Happy New Year to you too ;)

The method you referring was deprecated along with the TableSink whatsoever in favour of a much improved and feature rich new Source & Sink API. You can find an extensive documentation on this new API here[1].

Therefore if you use the old TableSink interface you must stick with the deprecated method.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html

On 04/01/2021 14:53, [hidden email] wrote:

Hi and Happy New Year,

 

I’m currently trying to remove deprecations to prepare for the upgrade to Flink 1.12. currently running on 1.11.

 

Specifically I need to update our code that registers table sinks into the StreamTableEnvironment. I’m maintaining jobs that use DataStreams with multiple sinks. Now I want to use the StatementSet to benefit from its DAG for multiple sinks.

 

So far I added the code to add the sinks into the StatementSet:

 

statementSet.addInsert(sinkName,.table)

 

and to execute the StatementSet:

 

statementSet.execute()

 

For this to work I need to register the sinks. I used to do that with the (now deprecated) function on the StreamTableEnvironment:

 

tableEnv.registerTableSink(

        sinkName,

        fieldNames,

        fieldTypes,

        tableSink

)

 

My Question is how to register sinks to be discovered by the statement set? What is the proper replacement for the function registerTableSink?

 

executeSql(ddl) as suggested, does not apply to this use case. Did not find anything in the documentation either: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#translate-and-execute-a-query

 

When running the job I’m getting the error, that the sink could not be found in the catalog. Which means I have to add the sink into the catalog, but how?

 

Which function should be used for registering a table sink into the table environments catalog?


Thanks!

 

Kind Regards,

 

Patrick

-- 

Patrick Eifler

 

Senior Software Engineer (BI)

Cloud Gaming Engineering & Infrastructure 
Sony Interactive Entertainment LLC 

Wilhelmstraße 118, 10963 Berlin


Germany

E: [hidden email]