How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

Jary Zhen
Hello everyone,

   First,a brief pipeline introduction: 
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      consume multi kafka topic 
      -> union them 
      -> assignTimestampsAndWatermarks 
      -> keyby 
      -> window()  and so on … 
It's a very normal way use flink to process data like this in production environment. 
But,  If I want to test the pipeline above I need to use the api of FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces 3 records per second while another topic produces 30000 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks

Reply | Threaded
Open this post in threaded view
|

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

dinesh
Hi Jary,

What you mean by step banlence . Could you please provide a concrete example

On Fri, May 22, 2020 at 3:46 PM Jary Zhen <[hidden email]> wrote:
Hello everyone,

   First,a brief pipeline introduction: 
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      consume multi kafka topic 
      -> union them 
      -> assignTimestampsAndWatermarks 
      -> keyby 
      -> window()  and so on … 
It's a very normal way use flink to process data like this in production environment. 
But,  If I want to test the pipeline above I need to use the api of FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces 3 records per second while another topic produces 30000 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks

Reply | Threaded
Open this post in threaded view
|

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

Jary Zhen
Hi, dinesh , thanks for your reply.

  For example, there are two topics, topic A produces 1 record per second and topic B produces 3600 records per second. If I set kafka consume config like this:
     max.poll.records: “3600"
     max.poll.interval.ms: "1000”) , 
which means I can get the whole records by every second from these two topics in real time.
But , if  I want to consume the data from last day or earlier days by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600 records within one second from topic A which is produce in an hour in production environment, at the same time, I will get 3600 records within one second from topic B which is produce in an second. So By using EventTime semanteme , the watermark assigned from topic A  wil aways let 
the data from topic B as ‘late data’ in window operator.  What I wanted is that 1 records from A and 3600 records from B by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp) so that I can simulate consume data as in real production environment. 


Best

   





On Sat, 23 May 2020 at 23:42, C DINESH <[hidden email]> wrote:
Hi Jary,

What you mean by step banlence . Could you please provide a concrete example

On Fri, May 22, 2020 at 3:46 PM Jary Zhen <[hidden email]> wrote:
Hello everyone,

   First,a brief pipeline introduction: 
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      consume multi kafka topic 
      -> union them 
      -> assignTimestampsAndWatermarks 
      -> keyby 
      -> window()  and so on … 
It's a very normal way use flink to process data like this in production environment. 
But,  If I want to test the pipeline above I need to use the api of FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces 3 records per second while another topic produces 30000 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks

Reply | Threaded
Open this post in threaded view
|

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

dinesh
HI Jary,

The easiest and simple solution is while creating consumer you can pass different config based on your requirements

Example : 

 For creating consumer for topic A you can pass config as
    max.poll.records: “1"
    max.poll.interval.ms: "1000”  

For creating consumer for topic B you can pass config as
    max.poll.records: “3600"
    max.poll.interval.ms: "1000”  


1. But actually your configuration has a flaw when you are giving setStartFromTimestamp. Which means if topic B is generating 3600 events  for every second and you put the setStartFromTimestamp to consume data from last 24 hours . Your second consumer will always be lag of one day.(It will never consume the real time data). Which is not we want in streaming.

2. For flink we don't need to pass these settings (max.poll.records,  max.poll.interval.ms). Flink will consume the data realtime by the architecture. If your job is consuming data slowly means(back pressure) you have to increase parallelism. there are several ways to increase parallelism (operator level, job level). 


I hope, I explained it clearly.  please let me know if you need further clarifications.

Thanks,
Dinesh


On Mon, May 25, 2020 at 12:34 PM Jary Zhen <[hidden email]> wrote:
Hi, dinesh , thanks for your reply.

  For example, there are two topics, topic A produces 1 record per second and topic B produces 3600 records per second. If I set kafka consume config like this:
     max.poll.records: “3600"
     max.poll.interval.ms: "1000”) , 
which means I can get the whole records by every second from these two topics in real time.
But , if  I want to consume the data from last day or earlier days by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600 records within one second from topic A which is produce in an hour in production environment, at the same time, I will get 3600 records within one second from topic B which is produce in an second. So By using EventTime semanteme , the watermark assigned from topic A  wil aways let 
the data from topic B as ‘late data’ in window operator.  What I wanted is that 1 records from A and 3600 records from B by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp) so that I can simulate consume data as in real production environment. 


Best

   





On Sat, 23 May 2020 at 23:42, C DINESH <[hidden email]> wrote:
Hi Jary,

What you mean by step banlence . Could you please provide a concrete example

On Fri, May 22, 2020 at 3:46 PM Jary Zhen <[hidden email]> wrote:
Hello everyone,

   First,a brief pipeline introduction: 
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      consume multi kafka topic 
      -> union them 
      -> assignTimestampsAndWatermarks 
      -> keyby 
      -> window()  and so on … 
It's a very normal way use flink to process data like this in production environment. 
But,  If I want to test the pipeline above I need to use the api of FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces 3 records per second while another topic produces 30000 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks

Reply | Threaded
Open this post in threaded view
|

Re: How to control the banlence by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp)

dinesh
HI Jary,

My first point is wrong. If we give these settings also flink will consume the whole data from the last one day.That is what we want right?

late data is defined by window length and water marks strategy. Are you combining your streams .please provide these details so that we can understand the problem more clearly.

Thanks,
Dinesh.



On Mon, May 25, 2020 at 6:58 PM C DINESH <[hidden email]> wrote:
HI Jary,

The easiest and simple solution is while creating consumer you can pass different config based on your requirements

Example : 

 For creating consumer for topic A you can pass config as
    max.poll.records: “1"
    max.poll.interval.ms: "1000”  

For creating consumer for topic B you can pass config as
    max.poll.records: “3600"
    max.poll.interval.ms: "1000”  


1. But actually your configuration has a flaw when you are giving setStartFromTimestamp. Which means if topic B is generating 3600 events  for every second and you put the setStartFromTimestamp to consume data from last 24 hours . Your second consumer will always be lag of one day.(It will never consume the real time data). Which is not we want in streaming.

2. For flink we don't need to pass these settings (max.poll.records,  max.poll.interval.ms). Flink will consume the data realtime by the architecture. If your job is consuming data slowly means(back pressure) you have to increase parallelism. there are several ways to increase parallelism (operator level, job level). 


I hope, I explained it clearly.  please let me know if you need further clarifications.

Thanks,
Dinesh


On Mon, May 25, 2020 at 12:34 PM Jary Zhen <[hidden email]> wrote:
Hi, dinesh , thanks for your reply.

  For example, there are two topics, topic A produces 1 record per second and topic B produces 3600 records per second. If I set kafka consume config like this:
     max.poll.records: “3600"
     max.poll.interval.ms: "1000”) , 
which means I can get the whole records by every second from these two topics in real time.
But , if  I want to consume the data from last day or earlier days by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp). I will get 3600 records within one second from topic A which is produce in an hour in production environment, at the same time, I will get 3600 records within one second from topic B which is produce in an second. So By using EventTime semanteme , the watermark assigned from topic A  wil aways let 
the data from topic B as ‘late data’ in window operator.  What I wanted is that 1 records from A and 3600 records from B by using FlinkKafkaConsumer.setStartFromTimestamp(timestamp) so that I can simulate consume data as in real production environment. 


Best

   





On Sat, 23 May 2020 at 23:42, C DINESH <[hidden email]> wrote:
Hi Jary,

What you mean by step banlence . Could you please provide a concrete example

On Fri, May 22, 2020 at 3:46 PM Jary Zhen <[hidden email]> wrote:
Hello everyone,

   First,a brief pipeline introduction: 
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
      consume multi kafka topic 
      -> union them 
      -> assignTimestampsAndWatermarks 
      -> keyby 
      -> window()  and so on … 
It's a very normal way use flink to process data like this in production environment. 
But,  If I want to test the pipeline above I need to use the api of FlinkKafkaConsumer.setStartFromTimestamp(timestamp) to comsume 'past’ data.
So my question is how to control the ’step‘ banlence as one topic produces 3 records per second while another topic produces 30000 per second.

I don’t know if I describe clearly . so any suspicion please let me know

Tks