Kafka as source for batch job

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka as source for batch job

Marchant, Hayden
I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e.  from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y.  This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, though there might be more:

1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished
       Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source.
       Pros - represent bounded data from Kafka topic as batch source, Cons  - requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file.  
       Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before?

Thanks,
Hayden Marchant

Reply | Threaded
Open this post in threaded view
|

Re: Kafka as source for batch job

Tzu-Li (Gordon) Tai
Hi Hayden,

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?
I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka.

Cheers,
Gordon

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden ([hidden email]) wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, though there might be more:

1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before?

Thanks,
Hayden Marchant

Reply | Threaded
Open this post in threaded view
|

RE: Kafka as source for batch job

Marchant, Hayden

Gordon,

 

Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements.

 

Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false.

 

What would you recommend in this case?

 

From: Tzu-Li (Gordon) Tai [mailto:[hidden email]]
Sent: Thursday, February 08, 2018 12:24 PM
To: [hidden email]; Marchant, Hayden [ICG-IT] <[hidden email]>
Subject: Re: Kafka as source for batch job

 

Hi Hayden,

 

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?

I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka.

 

Cheers,

Gordon

 

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden ([hidden email]) wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, though there might be more:

1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before?

Thanks,
Hayden Marchant

Reply | Threaded
Open this post in threaded view
|

RE: Kafka as source for batch job

Tzu-Li (Gordon) Tai
Hi Marchant,

Yes I agree. In general, the isEndOfStream method has a very ill-defined semantic, with actually different behaviors across different Kafka connector versions.
This method will definitely need to be revisited in the future (we are thinking about a rework of the connector).

What is your target Kafka version? And do you know the ending offsets of _all_ partitions which you want to only consume a range of?
I can probably double check for you if your specific case is possible, given the above information.

Cheers,
Gordon

On 8 February 2018 at 3:22:24 PM, Marchant, Hayden ([hidden email]) wrote:

Gordon,

 

Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements.

 

Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false.

 

What would you recommend in this case?

 

From: Tzu-Li (Gordon) Tai [mailto:[hidden email]]
Sent: Thursday, February 08, 2018 12:24 PM
To: [hidden email]; Marchant, Hayden [ICG-IT] <[hidden email]>
Subject: Re: Kafka as source for batch job

 

Hi Hayden,

 

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?

I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka.

 

Cheers,

Gordon

 

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden ([hidden email]) wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, though there might be more:

1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before?

Thanks,
Hayden Marchant

Reply | Threaded
Open this post in threaded view
|

RE: Kafka as source for batch job

Marchant, Hayden

Hi Gordon,

 

Actually our use case is that we have start/end timestamp, and we plan on calling KafkaConsumer.offsetForTimes to get the offsets for each partition. So, I guess our logic is different in that we have an ‘and’ predicate between each partition arriving at offset, as opposed to the current ‘or’ predicate – i.e. any partition that fulfills a condition is enough to stop the job.

 

Either way, I’d still need to figure out when to stop the job.

 

Would it make more sense to implement an InputFormat that could wrap this ‘bounded’ Kafka source, and use the DataSet / Batch Table API ?

 

Thanks

Hayden

 

From: Tzu-Li (Gordon) Tai [mailto:[hidden email]]
Sent: Thursday, February 08, 2018 8:05 PM
To: [hidden email]; Marchant, Hayden [ICG-IT] <[hidden email]>
Subject: RE: Kafka as source for batch job

 

Hi Marchant,

 

Yes I agree. In general, the isEndOfStream method has a very ill-defined semantic, with actually different behaviors across different Kafka connector versions.

This method will definitely need to be revisited in the future (we are thinking about a rework of the connector).

 

What is your target Kafka version? And do you know the ending offsets of _all_ partitions which you want to only consume a range of?

I can probably double check for you if your specific case is possible, given the above information.

 

Cheers,

Gordon

 

On 8 February 2018 at 3:22:24 PM, Marchant, Hayden ([hidden email]) wrote:

Gordon,

 

Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements.

 

Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false.

 

What would you recommend in this case?

 

From: Tzu-Li (Gordon) Tai [[hidden email]]
Sent: Thursday, February 08, 2018 12:24 PM
To: [hidden email]; Marchant, Hayden [ICG-IT] <[hidden email]>
Subject: Re: Kafka as source for batch job

 

Hi Hayden,

 

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?

I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka.

 

Cheers,

Gordon

 

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden ([hidden email]) wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, though there might be more:

1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before?

Thanks,
Hayden Marchant

Reply | Threaded
Open this post in threaded view
|

RE: Kafka as source for batch job

Marchant, Hayden
In reply to this post by Tzu-Li (Gordon) Tai

Forget to mention that my target Kafka version is 0.11.x  with aim to upgrade to 1.0 when 1.0.x fixpack is released.

 

From: Tzu-Li (Gordon) Tai [mailto:[hidden email]]
Sent: Thursday, February 08, 2018 8:05 PM
To: [hidden email]; Marchant, Hayden [ICG-IT] <[hidden email]>
Subject: RE: Kafka as source for batch job

 

Hi Marchant,

 

Yes I agree. In general, the isEndOfStream method has a very ill-defined semantic, with actually different behaviors across different Kafka connector versions.

This method will definitely need to be revisited in the future (we are thinking about a rework of the connector).

 

What is your target Kafka version? And do you know the ending offsets of _all_ partitions which you want to only consume a range of?

I can probably double check for you if your specific case is possible, given the above information.

 

Cheers,

Gordon

 

On 8 February 2018 at 3:22:24 PM, Marchant, Hayden ([hidden email]) wrote:

Gordon,

 

Thanks for the pointer. I did some searches for usages of isEndOfStream and it’s a little confusing. I see that all implementors of DeserializationSchema must implement this method, but it’s not called from anyone central in the Flink streaming engine, but rather each source can decide to use this in it’s own implementation – for example Kafka stops processing the topic when isEndOfStream returns true. This is nice, but localizes the treatment just to that Operator, and, even though it goers a long way in ensuring that I get just my bounded data, it still does not give me the ability to stop my job when I have finished consuming the elements.

 

Also, in my case I need to ensure that I have reached a certain offset for each of the Kafka partitions that are assigned to the instance of source function. It seems from the code that I need a different implementation of KafkaFetcher.runFetchLoop that has slightly different logic for changing running to be false.

 

What would you recommend in this case?

 

From: Tzu-Li (Gordon) Tai [[hidden email]]
Sent: Thursday, February 08, 2018 12:24 PM
To: [hidden email]; Marchant, Hayden [ICG-IT] <[hidden email]>
Subject: Re: Kafka as source for batch job

 

Hi Hayden,

 

Have you tried looking into `KeyedDeserializationSchema#isEndOfStream` [1]?

I think that could be what you are looking for. It signals the end of the stream when consuming from Kafka.

 

Cheers,

Gordon

 

On 8 February 2018 at 10:44:59 AM, Marchant, Hayden ([hidden email]) wrote:

I know that traditionally Kafka is used as a source for a streaming job. In our particular case, we are looking at extracting records from a Kafka topic from a particular well-defined offset range (per partition) - i.e. from offset X to offset Y. In this case, we'd somehow want the application to know that it has finished when it gets to offset Y. This is basically changes Kafka stream to be bounded data as opposed to unbounded in the usual Stream paradigm.

What would be the best approach to do this in Flink? I see a few options, though there might be more:

1. Use a regular streaming job, and have some external service that monitors the current offsets of the consumer group of the topic and manually stops job when the consumer group of the topic has finished
Pros - simple wrt Flink, Cons - hacky

2. Create a batch job, and a new InputFormat based on Kafka that reads the specified subset of Kafka topic into the source.
Pros - represent bounded data from Kafka topic as batch source, Cons - requires implementation of source.

3. Dump the subset of Kafka into a file and then trigger a more 'traditional' Flink batch job that reads from a file.
Pros - simple, cons - unnecessary I/O.

I personally prefer 1 and 3 for simplicity. Has anyone done anything like this before?

Thanks,
Hayden Marchant