Dynamic JDBC Sink Support

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Dynamic JDBC Sink Support

Rion Williams
Hi all,

I’ve been playing around with a proof-of-concept application with Flink to assist a colleague of mine. The application is fairly simple (take in a single input and identify various attributes about it) with the goal of outputting those to separate tables in Postgres:

object AttributeIdentificationJob {
    @JvmStatic
    fun main(args: Array<String>) {
        val stream = StreamExecutionEnvironment.getExecutionEnvironment()

        stream
            .addSource(ReadFromKafka())
            .process(IdentifyAttributesFunction())
            .addSink(DynamicJdbcHere())

        // execute program
        stream.execute("Attribute Identification")
    }
}


Considering my attributes may be of varying types (all implementing an Attribute interface), I don't know if the existing JdbcSink functionality or some variant of it (i.e. one of the dynamic ones that I see listed) could handle this functionality. Essentially for a given "bundle" of records, I'd need to ensure that each respective type of attribute was upserted into its corresponding table within a Postgres database.

Is that something that the connector can handle on it's own? Or would I need to implement my own
RichSinkFunction<Collection<Attribute>> that could handle opening a connection to Postgres and dynamically generating the appropriate UPSERT statements to handle sending the records? As a follow up to that, if I did need to write my own RichSinkFunction, would I need to implement my own checkmarking for resilience purposes or does that come along for the ride for RichSinkFunctions?

Any insight or approaches would be welcome!

Thanks,

Rion
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic JDBC Sink Support

David Anderson-4
Rion,

A given JdbcSink can only write to one table, but if the number of tables involved isn't unreasonable, you could use a separate sink for each table, and use side outputs [1] from a process function to steer each record to the appropriate sink. 

I suggest you avoid trying to implement a sink.

In general, custom sinks need to implement their own checkpointing, though there is a generic two phase commit sink you can use as a starting point for implementing a transactional sink. FYI, the JDBC sink has been reworked for 1.13 to include exactly-once guarantees based on the XA standard [2].

Regards,
David


On Fri, Mar 5, 2021 at 7:34 PM Rion Williams <[hidden email]> wrote:
Hi all,

I’ve been playing around with a proof-of-concept application with Flink to assist a colleague of mine. The application is fairly simple (take in a single input and identify various attributes about it) with the goal of outputting those to separate tables in Postgres:

object AttributeIdentificationJob {
    @JvmStatic
    fun main(args: Array<String>) {
        val stream = StreamExecutionEnvironment.getExecutionEnvironment()

        stream
            .addSource(ReadFromKafka())
            .process(IdentifyAttributesFunction())
            .addSink(DynamicJdbcHere())

        // execute program
        stream.execute("Attribute Identification")
    }
}


Considering my attributes may be of varying types (all implementing an Attribute interface), I don't know if the existing JdbcSink functionality or some variant of it (i.e. one of the dynamic ones that I see listed) could handle this functionality. Essentially for a given "bundle" of records, I'd need to ensure that each respective type of attribute was upserted into its corresponding table within a Postgres database.

Is that something that the connector can handle on it's own? Or would I need to implement my own
RichSinkFunction<Collection<Attribute>> that could handle opening a connection to Postgres and dynamically generating the appropriate UPSERT statements to handle sending the records? As a follow up to that, if I did need to write my own RichSinkFunction, would I need to implement my own checkmarking for resilience purposes or does that come along for the ride for RichSinkFunctions?

Any insight or approaches would be welcome!

Thanks,

Rion
Reply | Threaded
Open this post in threaded view
|

Re: Dynamic JDBC Sink Support

Maciej Obuchowski
Hey Rion,

I had exactly the same problem and implemented this functionality in
my Flink fork with XA sink taken from the development branch.
As I see that it's not only my problem, I've created a Jira task for
it - FLINK-21643 - and will provide draft PR for it.

@David - for traditional relational databases even a relatively small
number of connections can be unreasonable here.

Thanks,
Maciej

pt., 5 mar 2021 o 21:55 David Anderson <[hidden email]> napisał(a):

>
> Rion,
>
> A given JdbcSink can only write to one table, but if the number of tables involved isn't unreasonable, you could use a separate sink for each table, and use side outputs [1] from a process function to steer each record to the appropriate sink.
>
> I suggest you avoid trying to implement a sink.
>
> In general, custom sinks need to implement their own checkpointing, though there is a generic two phase commit sink you can use as a starting point for implementing a transactional sink. FYI, the JDBC sink has been reworked for 1.13 to include exactly-once guarantees based on the XA standard [2].
>
> Regards,
> David
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
> [2] https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/datastream/jdbc/#jdbcsinkexactlyoncesink
>
> On Fri, Mar 5, 2021 at 7:34 PM Rion Williams <[hidden email]> wrote:
>>
>> Hi all,
>>
>> I’ve been playing around with a proof-of-concept application with Flink to assist a colleague of mine. The application is fairly simple (take in a single input and identify various attributes about it) with the goal of outputting those to separate tables in Postgres:
>>
>> object AttributeIdentificationJob {
>>     @JvmStatic
>>     fun main(args: Array<String>) {
>>         val stream = StreamExecutionEnvironment.getExecutionEnvironment()
>>
>>         stream
>>             .addSource(ReadFromKafka())
>>             .process(IdentifyAttributesFunction())
>>             .addSink(DynamicJdbcHere())
>>
>>         // execute program
>>         stream.execute("Attribute Identification")
>>     }
>> }
>>
>> Considering my attributes may be of varying types (all implementing an Attribute interface), I don't know if the existing JdbcSink functionality or some variant of it (i.e. one of the dynamic ones that I see listed) could handle this functionality. Essentially for a given "bundle" of records, I'd need to ensure that each respective type of attribute was upserted into its corresponding table within a Postgres database.
>>
>> Is that something that the connector can handle on it's own? Or would I need to implement my own RichSinkFunction<Collection<Attribute>> that could handle opening a connection to Postgres and dynamically generating the appropriate UPSERT statements to handle sending the records? As a follow up to that, if I did need to write my own RichSinkFunction, would I need to implement my own checkmarking for resilience purposes or does that come along for the ride for RichSinkFunctions?
>>
>> Any insight or approaches would be welcome!
>>
>> Thanks,
>>
>> Rion