Batch job per stream message?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Batch job per stream message?

Tomas Mazukna
Trying to figure out the best design in Flink.
Reading from a kafka topic which has messages with pointers to files to be processed.
I am thinking to somehow kick off a batch job per file... unless there is an easy way to get a separate dataset per file.
I can do almost all of this in the stream, parse file with flat map -> explode its contents into multiple data elements -> map, etc...
On of these steps would be to grab another dataset from JDBC source and join with the stream's contents...
I think I am mixing the two concepts here and the right approach would be to kick of this mini batch job per file, 
where I have file datase t+ jdbc dataset to join with.

So how would I go about kicking a batch from from streaming job?

Thanks,
Tomas
Reply | Threaded
Open this post in threaded view
|

Re: Batch job per stream message?

Fabian Hueske-2
Hi Tomas,

triggering a batch DataSet job from a DataStream program for each input record doesn't sound like a good idea to me.
You would have to make sure that the cluster always has sufficient resources and handle failures.

It would be preferable to have all data processing in a DataStream job. You mentioned that the challenge is to join the data of the files with a JDBC database.
I see two ways to do that in a DataStream program:
- replicate the JDBC table in a stateful operator. This means that you have to publish updates on the database to the Flink program.
- query the JDBC table with an AsyncFunction. This operator concurrently executes multiple calls to an external service which improves latency and throughput. The operator ensures that checkpoints and watermarks are correctly handled.

Best, Fabian

2017-10-30 19:11 GMT+01:00 Tomas Mazukna <[hidden email]>:
Trying to figure out the best design in Flink.
Reading from a kafka topic which has messages with pointers to files to be processed.
I am thinking to somehow kick off a batch job per file... unless there is an easy way to get a separate dataset per file.
I can do almost all of this in the stream, parse file with flat map -> explode its contents into multiple data elements -> map, etc...
On of these steps would be to grab another dataset from JDBC source and join with the stream's contents...
I think I am mixing the two concepts here and the right approach would be to kick of this mini batch job per file, 
where I have file datase t+ jdbc dataset to join with.

So how would I go about kicking a batch from from streaming job?

Thanks,
Tomas

Reply | Threaded
Open this post in threaded view
|

Re: Batch job per stream message?

Tomas Mazukna
Hi Fabian,

thanks for pointing me in the right direction....
reading through the documentation here:

seems like I can accomplish what I need with async call to a rest service or jdbc query per stream item being processed.
The only confusion for is this statement:

The AsyncCollector is completed with the first call of AsyncCollector.collect. All subsequent collect calls will be ignored.  

so basically there has to be an accumulator implemented inside AsyncFunction to gather up all results and return them in a single .collect() call.
but how to know when to do so? or I am completely off track here....



On Wed, 1 Nov 2017 at 03:57 Fabian Hueske <[hidden email]> wrote:
Hi Tomas,

triggering a batch DataSet job from a DataStream program for each input record doesn't sound like a good idea to me.
You would have to make sure that the cluster always has sufficient resources and handle failures.

It would be preferable to have all data processing in a DataStream job. You mentioned that the challenge is to join the data of the files with a JDBC database.
I see two ways to do that in a DataStream program:
- replicate the JDBC table in a stateful operator. This means that you have to publish updates on the database to the Flink program.
- query the JDBC table with an AsyncFunction. This operator concurrently executes multiple calls to an external service which improves latency and throughput. The operator ensures that checkpoints and watermarks are correctly handled.

Best, Fabian

2017-10-30 19:11 GMT+01:00 Tomas Mazukna <[hidden email]>:
Trying to figure out the best design in Flink.
Reading from a kafka topic which has messages with pointers to files to be processed.
I am thinking to somehow kick off a batch job per file... unless there is an easy way to get a separate dataset per file.
I can do almost all of this in the stream, parse file with flat map -> explode its contents into multiple data elements -> map, etc...
On of these steps would be to grab another dataset from JDBC source and join with the stream's contents...
I think I am mixing the two concepts here and the right approach would be to kick of this mini batch job per file, 
where I have file datase t+ jdbc dataset to join with.

So how would I go about kicking a batch from from streaming job?

Thanks,
Tomas

Reply | Threaded
Open this post in threaded view
|

Re: Batch job per stream message?

Fabian Hueske-2
Hi Tomas,

I'm not familiar with the details of the AsyncFunction, but I'd interpret this as follows:

- you can make one async call in the asyncInvoke method.
- this call will result in a callback and from that one callback you can emit a single result by calling AsyncCollector.collect()

The asyncInvoke method is called once per event in the stream, so each stream event results in one async call and one result.
It's kind of like a MapFunction that talks to an external service.

So if you need to make multiple calls per event, you need multiple AsyncFunctions.

Best, Fabian

2017-11-01 16:12 GMT+01:00 Tomas Mazukna <[hidden email]>:
Hi Fabian,

thanks for pointing me in the right direction....
reading through the documentation here:

seems like I can accomplish what I need with async call to a rest service or jdbc query per stream item being processed.
The only confusion for is this statement:

The AsyncCollector is completed with the first call of AsyncCollector.collect. All subsequent collect calls will be ignored.  

so basically there has to be an accumulator implemented inside AsyncFunction to gather up all results and return them in a single .collect() call.
but how to know when to do so? or I am completely off track here....



On Wed, 1 Nov 2017 at 03:57 Fabian Hueske <[hidden email]> wrote:
Hi Tomas,

triggering a batch DataSet job from a DataStream program for each input record doesn't sound like a good idea to me.
You would have to make sure that the cluster always has sufficient resources and handle failures.

It would be preferable to have all data processing in a DataStream job. You mentioned that the challenge is to join the data of the files with a JDBC database.
I see two ways to do that in a DataStream program:
- replicate the JDBC table in a stateful operator. This means that you have to publish updates on the database to the Flink program.
- query the JDBC table with an AsyncFunction. This operator concurrently executes multiple calls to an external service which improves latency and throughput. The operator ensures that checkpoints and watermarks are correctly handled.

Best, Fabian

2017-10-30 19:11 GMT+01:00 Tomas Mazukna <[hidden email]>:
Trying to figure out the best design in Flink.
Reading from a kafka topic which has messages with pointers to files to be processed.
I am thinking to somehow kick off a batch job per file... unless there is an easy way to get a separate dataset per file.
I can do almost all of this in the stream, parse file with flat map -> explode its contents into multiple data elements -> map, etc...
On of these steps would be to grab another dataset from JDBC source and join with the stream's contents...
I think I am mixing the two concepts here and the right approach would be to kick of this mini batch job per file, 
where I have file datase t+ jdbc dataset to join with.

So how would I go about kicking a batch from from streaming job?

Thanks,
Tomas