broadcasting a stream from a collection that is populated from an web service call

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

broadcasting a stream from a collection that is populated from an web service call

Sathi Chowdhury
It’s good to be here. I have a data stream coming from kinesis. I also have a list of hashmap which holds metadata that needs to participate in the processing.
In my flink processor class I construct this metadata (hardcoded)

public static void main(String[] args) throws Exception {
…….//
DataStream<ObjectNode> jsonStream = see.addSource(new FlinkKinesisConsumer<>("int-harvester-kinesis-us-west-2", new MyJsonDeserializationSchema(), consumerConfig));

DataStream<HashMap<String,String>> metaStream = env.fromCollection(metadata);
DataStream<String> outStream = metaStream
.broadcast()
.connect(jsonStream).flatMap(new SplitterCoFlatMapFunction());


SplitterCoFlatMapFunction extends RichCoFlatMapFunction

By doing the above I can operate on my jsonStream and avail elements of hashMap in that processing.
I am not sure , as a next step , if I am given a rest api ,which will eventually give me the latest version of that metadata collection , what is the right way to do that call and from where , as when flink process is started the metadata is already prepared and propagated to the workers,where can I refresh it.

What is the most correct way of doing it?
Thanks
Sathi
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============
Reply | Threaded
Open this post in threaded view
|

Re: broadcasting a stream from a collection that is populated from an web service call

Till Rohrmann
Hi Sathi,

I would ingest the meta data also into a kinesis queue and read the data from there. Then you don't have to fiddle around with the rest API from within your Flink job.

If that is not feasible for you, then you can also write your own custom source function which queries the REST endpoint and whenever it receives new data it will send the data to its downstream operators.

Cheers,
Till

On Thu, Feb 2, 2017 at 10:27 PM, Sathi Chowdhury <[hidden email]> wrote:
It’s good to be here. I have a data stream coming from kinesis. I also have a list of hashmap which holds metadata that needs to participate in the processing.
In my flink processor class I construct this metadata (hardcoded)

public static void main(String[] args) throws Exception {
…….//
DataStream<ObjectNode> jsonStream = see.addSource(new FlinkKinesisConsumer<>("int-harvester-kinesis-us-west-2", new MyJsonDeserializationSchema(), consumerConfig));

DataStream<HashMap<String,String>> metaStream = env.fromCollection(metadata);
DataStream<String> outStream = metaStream
.broadcast()
.connect(jsonStream).flatMap(new SplitterCoFlatMapFunction());


SplitterCoFlatMapFunction extends RichCoFlatMapFunction

By doing the above I can operate on my jsonStream and avail elements of hashMap in that processing.
I am not sure , as a next step , if I am given a rest api ,which will eventually give me the latest version of that metadata collection , what is the right way to do that call and from where , as when flink process is started the metadata is already prepared and propagated to the workers,where can I refresh it.

What is the most correct way of doing it?
Thanks
Sathi
=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

Reply | Threaded
Open this post in threaded view
|

Re: broadcasting a stream from a collection that is populated from an web service call

Sathi Chowdhury

Hi Till,

Thanks for your reply.

Probably keeping a kinesis in between makes lot of sense from many angle. My assumption is the code below can be changed to read the second stream from a kinesis stream and will always read the latest data from the seond stream and broadcast it so no disparity between the actual event data to be processed and metadata coming in second stream

Sathi

 

From: Till Rohrmann <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Thursday, February 2, 2017 at 2:10 PM
To: "[hidden email]" <[hidden email]>
Subject: Re: broadcasting a stream from a collection that is populated from an web service call

 

Hi Sathi,

 

I would ingest the meta data also into a kinesis queue and read the data from there. Then you don't have to fiddle around with the rest API from within your Flink job.

 

If that is not feasible for you, then you can also write your own custom source function which queries the REST endpoint and whenever it receives new data it will send the data to its downstream operators.

 

Cheers,

Till

 

On Thu, Feb 2, 2017 at 10:27 PM, Sathi Chowdhury <[hidden email]> wrote:

It’s good to be here. I have a data stream coming from kinesis. I also have a list of hashmap which holds metadata that needs to participate in the processing.

In my flink processor class I construct this metadata (hardcoded)

 

public static void main(String[] args) throws Exception {
…….//
DataStream<ObjectNode> jsonStream = see.addSource(new FlinkKinesisConsumer<>("int-harvester-kinesis-us-west-2", new MyJsonDeserializationSchema(), consumerConfig));

 

DataStream<HashMap<String,String>> metaStream = env.fromCollection(metadata);
DataStream<String> outStream = metaStream
                                .broadcast()
                                .connect(jsonStream).flatMap(new SplitterCoFlatMapFunction());
 
 
SplitterCoFlatMapFunction extends RichCoFlatMapFunction
 
By doing the above I can operate on my jsonStream and avail elements of hashMap in that processing.
I am not sure , as a next step , if I am given a rest api ,which will eventually give me the latest version of that metadata collection , what is the right way to do that call and from where , as when flink process is started the metadata is already prepared and propagated to the workers,where can I refresh it.
 
What is the most correct way of doing it?
Thanks
Sathi

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============

 

=============Notice to Recipient: This e-mail transmission, and any documents, files or previous e-mail messages attached to it may contain information that is confidential or legally privileged, and intended for the use of the individual or entity named above. If you are not the intended recipient, or a person responsible for delivering it to the intended recipient, you are hereby notified that you must not read this transmission and that any disclosure, copying, printing, distribution or use of any of the information contained in or attached to this transmission is STRICTLY PROHIBITED. If you have received this transmission in error, please immediately notify the sender by telephone or return e-mail and delete the original transmission and its attachments without reading or saving in any manner. Thank you. =============