Enriching a tuple mapped from a datastream with data coming from a JDBC source

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Enriching a tuple mapped from a datastream with data coming from a JDBC source

Philipp Bussche
Hi there,
I have a data stream (coming from Kafka) that contains information which I want to enrich with information that sits in a database before I handover the enriched tuple to a sink.
How would I do that ?
I was thinking of somehow combining my streaming job with a JDBC input but wasn't very succesful in getting this going.
Thanks
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

snntr
Hi Philipp,

the easist way is a RichMap. In the open()-Method you can load the
relevant database table into memory (e.g. a HashMap). In the
map()-method you than just look up the entry in the HashMap.

Of course, this only works if the dataset is small enough to fit in
memory. Is it?

Cheers,

Konstantin


On 12.09.2016 02:36, Philipp Bussche wrote:
> Hi there,
> I have a data stream (coming from Kafka) that contains information which
> I want to enrich with information that sits in a database before I
> handover the enriched tuple to a sink.
> How would I do that ?
> I was thinking of somehow combining my streaming job with a JDBC input
> but wasn't very succesful in getting this going.
> Thanks
> Philipp

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Philipp Bussche
Thank you Konstantin, the amount of data I have to load into memory will be very small so that should be alright.
When opening and querying the database would I use any sort of Flink magic or just do plain JDBC ?
I read about the JDBCInput concept which one could use with the DataSet API and was wondering if I could use that somehow in my open method then ?

Thanks
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

snntr
You can just use plain JDBC. Just keep in mind, that the classes will be
serialized and sent through the cluster. So probably, you want to
initialize all the non-serializable database access object in the open
method itself (as opposed to the constructor (client side)).

Cheers,

Konstantin

On 12.09.2016 13:53, Philipp Bussche wrote:

> Thank you Konstantin, the amount of data I have to load into memory will be
> very small so that should be alright.
> When opening and querying the database would I use any sort of Flink magic
> or just do plain JDBC ?
> I read about the JDBCInput concept which one could use with the DataSet API
> and was wondering if I could use that somehow in my open method then ?
>
> Thanks
> Philipp
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Enriching-a-tuple-mapped-from-a-datastream-with-data-coming-from-a-JDBC-source-tp8993p9002.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Philipp Bussche
Hi again,
I implemented the RichMap Function (open method runs a JDBC query to populate a HashMap with data) which I am using in the map function.
Now there is another RichMap.map function that would add to the HashMap that was initialized in the first function.
How would I share the Map between the two functions (I am using the datastreaming API) ?

Thanks
Philipp
Reply | Threaded
Open this post in threaded view
|

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Fabian Hueske-2
Hi Philipp,

If I got your requirements right you would like to:
1) load an initial hashmap via JDBC
2) update the hashmap from a stream
3) use the hashmap to enrich another stream.

You can use a CoFlatMap to do this:

stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).

YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial hashmap load can be done in the open method.
A CoFlatMapFunction has two inputs and two flatMap methods, one for each input. One method can update the hashmap, the other enrich the second stream with data from the hashmap.
Both methods are not concurrently called and the order in which they are called depends on what data is available.

In general, it is not possible to share local operator state among different operators (or even parallel instance of the same operator).

Hope this helps,
Fabian

Reply | Threaded
Open this post in threaded view
|

Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

Philipp Bussche
Awesome, thanks Fabian !

I will give this a try.

Fabian Hueske-2 wrote
Hi Philipp,

If I got your requirements right you would like to:
1) load an initial hashmap via JDBC
2) update the hashmap from a stream
3) use the hashmap to enrich another stream.

You can use a CoFlatMap to do this:

stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).

YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
hashmap load can be done in the open method.
A CoFlatMapFunction has two inputs and two flatMap methods, one for each
input. One method can update the hashmap, the other enrich the second
stream with data from the hashmap.
Both methods are not concurrently called and the order in which they are
called depends on what data is available.

In general, it is not possible to share local operator state among
different operators (or even parallel instance of the same operator).

Hope this helps,
Fabian
Fabian Hueske-2 wrote
Hi Philipp,

If I got your requirements right you would like to:
1) load an initial hashmap via JDBC
2) update the hashmap from a stream
3) use the hashmap to enrich another stream.

You can use a CoFlatMap to do this:

stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).

YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
hashmap load can be done in the open method.
A CoFlatMapFunction has two inputs and two flatMap methods, one for each
input. One method can update the hashmap, the other enrich the second
stream with data from the hashmap.
Both methods are not concurrently called and the order in which they are
called depends on what data is available.

In general, it is not possible to share local operator state among
different operators (or even parallel instance of the same operator).

Hope this helps,
Fabian