reading from latest kafka offset when flink starts

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

reading from latest kafka offset when flink starts

Balaji Rajagopalan
I am using the flink connector to read from a kafka stream, I ran into the problem where the flink job went down due to some application error, it was down for sometime, meanwhile the kafka queue was growing as expected no consumer to consume from the given group , and when I started the flink it started consuming the messages no problem so far, but consumer lag was huge since producer is a fast producer about 4500 events/sec. My question is there any flink connector configuration which can force it read from the latest offset when the flink application starts since in my application logic I do not care about older events. 

balaji 
Reply | Threaded
Open this post in threaded view
|

Re: reading from latest kafka offset when flink starts

rmetzger0
Hi,

yes, you can use Kafka's configuration setting for that. Its called "auto.offset.reset". Setting it to "latest" will change the restart behavior to the current offset ("earliest" is the opposite).

How heavy is the processing you are doing? 4500 events/second sounds not like a lot of throughput. 

On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan <[hidden email]> wrote:
I am using the flink connector to read from a kafka stream, I ran into the problem where the flink job went down due to some application error, it was down for sometime, meanwhile the kafka queue was growing as expected no consumer to consume from the given group , and when I started the flink it started consuming the messages no problem so far, but consumer lag was huge since producer is a fast producer about 4500 events/sec. My question is there any flink connector configuration which can force it read from the latest offset when the flink application starts since in my application logic I do not care about older events. 

balaji 

Reply | Threaded
Open this post in threaded view
|

Re: reading from latest kafka offset when flink starts

Balaji Rajagopalan
Thanks Robert appreciate your help. 

On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <[hidden email]> wrote:
Hi,

yes, you can use Kafka's configuration setting for that. Its called "auto.offset.reset". Setting it to "latest" will change the restart behavior to the current offset ("earliest" is the opposite).

How heavy is the processing you are doing? 4500 events/second sounds not like a lot of throughput. 

On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan <[hidden email]> wrote:
I am using the flink connector to read from a kafka stream, I ran into the problem where the flink job went down due to some application error, it was down for sometime, meanwhile the kafka queue was growing as expected no consumer to consume from the given group , and when I started the flink it started consuming the messages no problem so far, but consumer lag was huge since producer is a fast producer about 4500 events/sec. My question is there any flink connector configuration which can force it read from the latest offset when the flink application starts since in my application logic I do not care about older events. 

balaji 


Reply | Threaded
Open this post in threaded view
|

Re: reading from latest kafka offset when flink starts

Ufuk Celebi
Robert, what do you think about adding a note about this to the Kafka
consumer docs? This has come up a couple of times on the mailing list
already.

– Ufuk

On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
<[hidden email]> wrote:

> Thanks Robert appreciate your help.
>
> On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <[hidden email]> wrote:
>>
>> Hi,
>>
>> yes, you can use Kafka's configuration setting for that. Its called
>> "auto.offset.reset". Setting it to "latest" will change the restart behavior
>> to the current offset ("earliest" is the opposite).
>>
>> How heavy is the processing you are doing? 4500 events/second sounds not
>> like a lot of throughput.
>>
>> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>> <[hidden email]> wrote:
>>>
>>> I am using the flink connector to read from a kafka stream, I ran into
>>> the problem where the flink job went down due to some application error, it
>>> was down for sometime, meanwhile the kafka queue was growing as expected no
>>> consumer to consume from the given group , and when I started the flink it
>>> started consuming the messages no problem so far, but consumer lag was huge
>>> since producer is a fast producer about 4500 events/sec. My question is
>>> there any flink connector configuration which can force it read from the
>>> latest offset when the flink application starts since in my application
>>> logic I do not care about older events.
>>>
>>> balaji
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: reading from latest kafka offset when flink starts

Balaji Rajagopalan
Robert,
  Regarding the event qps 4500 events/sec may not be large no, but I am seeing some issue in processing the events due to processing power that I am using, I have deployed flink app on 3 node yarn cluster one node is a master, 2 slave nodes which has the taskmanager running. Each machine is a 2 core machine with 4 gb ram,with default.parallelism set to 4,  I find there is delay of 2 hours from the time event enters into the system to time it gets into the sink, it looks like the events are checkpointed but processed with a huge delay inside flink, is there any recommendation(wiki write up) for the no of taskmanager slots required for processing for certain load of incoming data. 

balaji 

On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi <[hidden email]> wrote:
Robert, what do you think about adding a note about this to the Kafka
consumer docs? This has come up a couple of times on the mailing list
already.

– Ufuk

On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
<[hidden email]> wrote:
> Thanks Robert appreciate your help.
>
> On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <[hidden email]> wrote:
>>
>> Hi,
>>
>> yes, you can use Kafka's configuration setting for that. Its called
>> "auto.offset.reset". Setting it to "latest" will change the restart behavior
>> to the current offset ("earliest" is the opposite).
>>
>> How heavy is the processing you are doing? 4500 events/second sounds not
>> like a lot of throughput.
>>
>> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>> <[hidden email]> wrote:
>>>
>>> I am using the flink connector to read from a kafka stream, I ran into
>>> the problem where the flink job went down due to some application error, it
>>> was down for sometime, meanwhile the kafka queue was growing as expected no
>>> consumer to consume from the given group , and when I started the flink it
>>> started consuming the messages no problem so far, but consumer lag was huge
>>> since producer is a fast producer about 4500 events/sec. My question is
>>> there any flink connector configuration which can force it read from the
>>> latest offset when the flink application starts since in my application
>>> logic I do not care about older events.
>>>
>>> balaji
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: reading from latest kafka offset when flink starts

Aljoscha Krettek
Hi,
are you per change using Kafka 0.9?

Cheers,
Aljoscha

On Tue, 10 May 2016 at 08:37 Balaji Rajagopalan <[hidden email]> wrote:
Robert,
  Regarding the event qps 4500 events/sec may not be large no, but I am seeing some issue in processing the events due to processing power that I am using, I have deployed flink app on 3 node yarn cluster one node is a master, 2 slave nodes which has the taskmanager running. Each machine is a 2 core machine with 4 gb ram,with default.parallelism set to 4,  I find there is delay of 2 hours from the time event enters into the system to time it gets into the sink, it looks like the events are checkpointed but processed with a huge delay inside flink, is there any recommendation(wiki write up) for the no of taskmanager slots required for processing for certain load of incoming data. 

balaji 

On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi <[hidden email]> wrote:
Robert, what do you think about adding a note about this to the Kafka
consumer docs? This has come up a couple of times on the mailing list
already.

– Ufuk

On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
<[hidden email]> wrote:
> Thanks Robert appreciate your help.
>
> On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <[hidden email]> wrote:
>>
>> Hi,
>>
>> yes, you can use Kafka's configuration setting for that. Its called
>> "auto.offset.reset". Setting it to "latest" will change the restart behavior
>> to the current offset ("earliest" is the opposite).
>>
>> How heavy is the processing you are doing? 4500 events/second sounds not
>> like a lot of throughput.
>>
>> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>> <[hidden email]> wrote:
>>>
>>> I am using the flink connector to read from a kafka stream, I ran into
>>> the problem where the flink job went down due to some application error, it
>>> was down for sometime, meanwhile the kafka queue was growing as expected no
>>> consumer to consume from the given group , and when I started the flink it
>>> started consuming the messages no problem so far, but consumer lag was huge
>>> since producer is a fast producer about 4500 events/sec. My question is
>>> there any flink connector configuration which can force it read from the
>>> latest offset when the flink application starts since in my application
>>> logic I do not care about older events.
>>>
>>> balaji
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: reading from latest kafka offset when flink starts

Balaji Rajagopalan
No I am using 0.8.0.2 kafka. I did some experiments with changing the parallelism from 4 to 16 now the lag has reduced to 20 min from 2 hours, the cpu utilization (load avg)  has gone up from 20-30 % to 50-60 % , so parallelism does seem to play a role in reducing the processing lag in flink as I expected. 

On Wed, May 11, 2016 at 11:42 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
are you per change using Kafka 0.9?

Cheers,
Aljoscha

On Tue, 10 May 2016 at 08:37 Balaji Rajagopalan <[hidden email]> wrote:
Robert,
  Regarding the event qps 4500 events/sec may not be large no, but I am seeing some issue in processing the events due to processing power that I am using, I have deployed flink app on 3 node yarn cluster one node is a master, 2 slave nodes which has the taskmanager running. Each machine is a 2 core machine with 4 gb ram,with default.parallelism set to 4,  I find there is delay of 2 hours from the time event enters into the system to time it gets into the sink, it looks like the events are checkpointed but processed with a huge delay inside flink, is there any recommendation(wiki write up) for the no of taskmanager slots required for processing for certain load of incoming data. 

balaji 

On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi <[hidden email]> wrote:
Robert, what do you think about adding a note about this to the Kafka
consumer docs? This has come up a couple of times on the mailing list
already.

– Ufuk

On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
<[hidden email]> wrote:
> Thanks Robert appreciate your help.
>
> On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <[hidden email]> wrote:
>>
>> Hi,
>>
>> yes, you can use Kafka's configuration setting for that. Its called
>> "auto.offset.reset". Setting it to "latest" will change the restart behavior
>> to the current offset ("earliest" is the opposite).
>>
>> How heavy is the processing you are doing? 4500 events/second sounds not
>> like a lot of throughput.
>>
>> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>> <[hidden email]> wrote:
>>>
>>> I am using the flink connector to read from a kafka stream, I ran into
>>> the problem where the flink job went down due to some application error, it
>>> was down for sometime, meanwhile the kafka queue was growing as expected no
>>> consumer to consume from the given group , and when I started the flink it
>>> started consuming the messages no problem so far, but consumer lag was huge
>>> since producer is a fast producer about 4500 events/sec. My question is
>>> there any flink connector configuration which can force it read from the
>>> latest offset when the flink application starts since in my application
>>> logic I do not care about older events.
>>>
>>> balaji
>>
>>
>