Hi all,
I’ve been playing around with a proof-of-concept application with Flink to assist a colleague of mine. The application is fairly simple (take in a single input and identify various attributes about it) with the goal of outputting those to separate tables in Postgres: object AttributeIdentificationJob { @JvmStatic fun main(args: Array<String>) { val stream = StreamExecutionEnvironment.getExecutionEnvironment() stream .addSource(ReadFromKafka()) .process(IdentifyAttributesFunction()) .addSink(DynamicJdbcHere()) // execute program stream.execute("Attribute Identification") } } Considering my attributes may be of varying types (all implementing an Attribute interface), I don't know if the existing JdbcSink functionality or some variant of it (i.e. one of the dynamic ones that I see listed) could handle this functionality. Essentially for a given "bundle" of records, I'd need to ensure that each respective type of attribute was upserted into its corresponding table within a Postgres database. Is that something that the connector can handle on it's own? Or would I need to implement my own RichSinkFunction<Collection<Attribute>> that could handle opening a connection to Postgres and dynamically generating the appropriate UPSERT statements to handle sending the records? As a follow up to that, if I did need to write my own RichSinkFunction, would I need to implement my own checkmarking for resilience purposes or does that come along for the ride for RichSinkFunctions? Any insight or approaches would be welcome! Thanks, Rion |
Rion, A given JdbcSink can only write to one table, but if the number of tables involved isn't unreasonable, you could use a separate sink for each table, and use side outputs [1] from a process function to steer each record to the appropriate sink. I suggest you avoid trying to implement a sink. In general, custom sinks need to implement their own checkpointing, though there is a generic two phase commit sink you can use as a starting point for implementing a transactional sink. FYI, the JDBC sink has been reworked for 1.13 to include exactly-once guarantees based on the XA standard [2]. Regards, David On Fri, Mar 5, 2021 at 7:34 PM Rion Williams <[hidden email]> wrote:
|
Hey Rion,
I had exactly the same problem and implemented this functionality in my Flink fork with XA sink taken from the development branch. As I see that it's not only my problem, I've created a Jira task for it - FLINK-21643 - and will provide draft PR for it. @David - for traditional relational databases even a relatively small number of connections can be unreasonable here. Thanks, Maciej pt., 5 mar 2021 o 21:55 David Anderson <[hidden email]> napisał(a): > > Rion, > > A given JdbcSink can only write to one table, but if the number of tables involved isn't unreasonable, you could use a separate sink for each table, and use side outputs [1] from a process function to steer each record to the appropriate sink. > > I suggest you avoid trying to implement a sink. > > In general, custom sinks need to implement their own checkpointing, though there is a generic two phase commit sink you can use as a starting point for implementing a transactional sink. FYI, the JDBC sink has been reworked for 1.13 to include exactly-once guarantees based on the XA standard [2]. > > Regards, > David > > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html > [2] https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink > > On Fri, Mar 5, 2021 at 7:34 PM Rion Williams <[hidden email]> wrote: >> >> Hi all, >> >> I’ve been playing around with a proof-of-concept application with Flink to assist a colleague of mine. The application is fairly simple (take in a single input and identify various attributes about it) with the goal of outputting those to separate tables in Postgres: >> >> object AttributeIdentificationJob { >> @JvmStatic >> fun main(args: Array<String>) { >> val stream = StreamExecutionEnvironment.getExecutionEnvironment() >> >> stream >> .addSource(ReadFromKafka()) >> .process(IdentifyAttributesFunction()) >> .addSink(DynamicJdbcHere()) >> >> // execute program >> stream.execute("Attribute Identification") >> } >> } >> >> Considering my attributes may be of varying types (all implementing an Attribute interface), I don't know if the existing JdbcSink functionality or some variant of it (i.e. one of the dynamic ones that I see listed) could handle this functionality. Essentially for a given "bundle" of records, I'd need to ensure that each respective type of attribute was upserted into its corresponding table within a Postgres database. >> >> Is that something that the connector can handle on it's own? Or would I need to implement my own RichSinkFunction<Collection<Attribute>> that could handle opening a connection to Postgres and dynamically generating the appropriate UPSERT statements to handle sending the records? As a follow up to that, if I did need to write my own RichSinkFunction, would I need to implement my own checkmarking for resilience purposes or does that come along for the ride for RichSinkFunctions? >> >> Any insight or approaches would be welcome! >> >> Thanks, >> >> Rion |
Free forum by Nabble | Edit this page |