Iterations vs. combo source/sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Iterations vs. combo source/sink

Ken Krugler
Hi all,

I’ve got a very specialized DB (runs in the JVM) that I need to use to both keep track of state and generate new records to be processed by my Flink streaming workflow. Some of the workflow results are updates to be applied to the DB.

And the DB needs to be partitioned.

My initial approach is to wrap it in a regular operator, and have subsequent streams be inputs for updating state. So now I’ve got an IterativeDataStream, which should work.

But I imagine I could also wrap this DB in a source and a sink, yes? Though I’m not sure how I could partition it as a source, in that case.

If it is feasible to have a partitioned source/sink, are there general pros/cons to either approach?

Thanks,

— Ken

Reply | Threaded
Open this post in threaded view
|

Re: Iterations vs. combo source/sink

Fabian Hueske-2
Hi Ken,

you can certainly have partitioned sources and sinks. You can control the parallelism by calling .setParallelism() method.
If you need a partitioned sink, you can call .keyBy() to hash partition.

I did not completely understand the requirements of your program. Can you maybe provide pseudo code for how the program should look like.

Some general comments:
- Flink's fault tolerance mechanism does not work with iterative data flows yet. This is work in progress see: FLINK-3257 [1]
- Flink's fault tolerance mechanism does only work if you expose all! internal operator state. So you would need to put your Java DB in Flink state to have a recoverable job.
- Is the DB essential in your application? Could you use Flink's key-partitioned state interface instead? That would help to make your job fault-tolerant.

Best,
Fabian

2016-09-29 1:15 GMT+02:00 Ken Krugler <[hidden email]>:
Hi all,

I’ve got a very specialized DB (runs in the JVM) that I need to use to both keep track of state and generate new records to be processed by my Flink streaming workflow. Some of the workflow results are updates to be applied to the DB.

And the DB needs to be partitioned.

My initial approach is to wrap it in a regular operator, and have subsequent streams be inputs for updating state. So now I’ve got an IterativeDataStream, which should work.

But I imagine I could also wrap this DB in a source and a sink, yes? Though I’m not sure how I could partition it as a source, in that case.

If it is feasible to have a partitioned source/sink, are there general pros/cons to either approach?

Thanks,

— Ken


Reply | Threaded
Open this post in threaded view
|

Re: Iterations vs. combo source/sink

Ken Krugler
Hi Fabian,

Thanks for responding. Comments and questions inline below.

Regards,

— Ken


On Sep 29, 2016, at 6:10am, Fabian Hueske <[hidden email]> wrote:

Hi Ken,

you can certainly have partitioned sources and sinks. You can control the parallelism by calling .setParallelism() method.

So I assume I’d implement the ParallelSourceFunction interface.

If you need a partitioned sink, you can call .keyBy() to hash partition.

I did not completely understand the requirements of your program. Can you maybe provide pseudo code for how the program should look like.

Just for grins, I’m looking at re-implementing the Bixo web crawler (built on top of Cascading/Hadoop MR) as a continuous crawler on top of Flink.

The main issue is the “crawl DB” that has to maintain the state of every URL ever seen, and also provide a fast way to generate the “best” URLs to be fetched. The logic of figuring out the best URL is complex, depending on factors like the anticipated value of the page, refetch rates for pages that have already been seen, number of unique URLs per domain vs. the domain “rank”, etc.

And it has to scale to something like 30B+ URLs with a small (e.g. 10 moderately big servers) cluster, so it needs to be very efficient in terms of memory/CPU usage.

An additional goal is to not require additional external infrastructure. That simplifies the operational overhead of running a continuous crawl.

So this “crawl DB” has to act as both a source (of the best URLs to fetch) and as a sink (for updates to fetched URLs, and as new URLs are discovered/injected). The state is a mix of in-memory and spilled to disk data.

Given what you mention below about iterative data flows not being fault tolerant, it seems like a combo source/sink (if possible) would be best.

Any guidance as to how to implement such a thing? I don’t know enough yet about Flink to determine if I can essentially have one task that’s acting as both the source & sink.

Some general comments:
- Flink's fault tolerance mechanism does not work with iterative data flows yet. This is work in progress see: FLINK-3257 [1]

OK, good to know.

- Flink's fault tolerance mechanism does only work if you expose all! internal operator state. So you would need to put your Java DB in Flink state to have a recoverable job.

Yes.

- Is the DB essential in your application? Could you use Flink's key-partitioned state interface instead? That would help to make your job fault-tolerant.

Yes, as per above.



2016-09-29 1:15 GMT+02:00 Ken Krugler <[hidden email]>:
Hi all,

I’ve got a very specialized DB (runs in the JVM) that I need to use to both keep track of state and generate new records to be processed by my Flink streaming workflow. Some of the workflow results are updates to be applied to the DB.

And the DB needs to be partitioned.

My initial approach is to wrap it in a regular operator, and have subsequent streams be inputs for updating state. So now I’ve got an IterativeDataStream, which should work.

But I imagine I could also wrap this DB in a source and a sink, yes? Though I’m not sure how I could partition it as a source, in that case.

If it is feasible to have a partitioned source/sink, are there general pros/cons to either approach?

Thanks,

— Ken



--------------------------
Ken Krugler
+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



Reply | Threaded
Open this post in threaded view
|

Re: Iterations vs. combo source/sink

Till Rohrmann

Hi Ken,

you can let a class implement both the SourceFunction and the SinkFunction. However when running a job, the source and the sink will be distinct instances. Thus, there is no way that they share instance variables.

What you could do is to write the updated and newly discovered URLs to a message queue like Kafka from which you read with your source function. That way you’ll have at least once processing guarantees. But then again, you’ll use external infrastructure.

If you really want to avoid this, then I guess you have to use iterations and do without the processing guarantees at the moment.

Cheers,
Till


On Sat, Oct 1, 2016 at 1:49 AM, Ken Krugler <[hidden email]> wrote:
Hi Fabian,

Thanks for responding. Comments and questions inline below.

Regards,

— Ken


On Sep 29, 2016, at 6:10am, Fabian Hueske <[hidden email]> wrote:

Hi Ken,

you can certainly have partitioned sources and sinks. You can control the parallelism by calling .setParallelism() method.

So I assume I’d implement the ParallelSourceFunction interface.

If you need a partitioned sink, you can call .keyBy() to hash partition.

I did not completely understand the requirements of your program. Can you maybe provide pseudo code for how the program should look like.

Just for grins, I’m looking at re-implementing the Bixo web crawler (built on top of Cascading/Hadoop MR) as a continuous crawler on top of Flink.

The main issue is the “crawl DB” that has to maintain the state of every URL ever seen, and also provide a fast way to generate the “best” URLs to be fetched. The logic of figuring out the best URL is complex, depending on factors like the anticipated value of the page, refetch rates for pages that have already been seen, number of unique URLs per domain vs. the domain “rank”, etc.

And it has to scale to something like 30B+ URLs with a small (e.g. 10 moderately big servers) cluster, so it needs to be very efficient in terms of memory/CPU usage.

An additional goal is to not require additional external infrastructure. That simplifies the operational overhead of running a continuous crawl.

So this “crawl DB” has to act as both a source (of the best URLs to fetch) and as a sink (for updates to fetched URLs, and as new URLs are discovered/injected). The state is a mix of in-memory and spilled to disk data.

Given what you mention below about iterative data flows not being fault tolerant, it seems like a combo source/sink (if possible) would be best.

Any guidance as to how to implement such a thing? I don’t know enough yet about Flink to determine if I can essentially have one task that’s acting as both the source & sink.

Some general comments:
- Flink's fault tolerance mechanism does not work with iterative data flows yet. This is work in progress see: FLINK-3257 [1]

OK, good to know.

- Flink's fault tolerance mechanism does only work if you expose all! internal operator state. So you would need to put your Java DB in Flink state to have a recoverable job.

Yes.

- Is the DB essential in your application? Could you use Flink's key-partitioned state interface instead? That would help to make your job fault-tolerant.

Yes, as per above.



2016-09-29 1:15 GMT+02:00 Ken Krugler <[hidden email]>:
Hi all,

I’ve got a very specialized DB (runs in the JVM) that I need to use to both keep track of state and generate new records to be processed by my Flink streaming workflow. Some of the workflow results are updates to be applied to the DB.

And the DB needs to be partitioned.

My initial approach is to wrap it in a regular operator, and have subsequent streams be inputs for updating state. So now I’ve got an IterativeDataStream, which should work.

But I imagine I could also wrap this DB in a source and a sink, yes? Though I’m not sure how I could partition it as a source, in that case.

If it is feasible to have a partitioned source/sink, are there general pros/cons to either approach?

Thanks,

— Ken



--------------------------
Ken Krugler
<a href="tel:%2B1%20530-210-6378" value="+15302106378" target="_blank">+1 530-210-6378
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr