Time needed to read from Kafka source

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Time needed to read from Kafka source

B.B.
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?



Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

Piotr Nowojski-4
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?



Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

Arvid Heise-4
Could you share your KafkaDeserializationSchema, we might be able to spot some optimization potential. You could also try out enableObjectReuse [1], which avoids copying data between tasks (not sure if you have any non-chained tasks).

If you are on 1.13, you could check out the flamegraph to see where the bottleneck occurs. [2]


On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?



Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

B.B.

Hi,
I forgot to mention that we are running Flink 1.12.0.

This is the main function (some parts of codes are abbreviated and this is the main part). As you can see the job was simplified to minimum. Just reading from source and printing. 




And this is deserializer:



BR,

BB


On Tue, 25 May 2021 at 17:51, Arvid Heise <[hidden email]> wrote:
Could you share your KafkaDeserializationSchema, we might be able to spot some optimization potential. You could also try out enableObjectReuse [1], which avoids copying data between tasks (not sure if you have any non-chained tasks).

If you are on 1.13, you could check out the flamegraph to see where the bottleneck occurs. [2]


On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?



--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane
Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

B.B.
In reply to this post by Piotr Nowojski-4
Hi,
I forgot to mention that we are using Flink 1.12.0. This is a job that has only minimum components. Reading from source and printing it.
Profiling was my next step to do. Regarding memory I didn't see any bottlenecks. 
I guess I will have to do some investigating in the metric part of Flink.

BR,
BB

On Tue, 25 May 2021 at 17:12, Piotr Nowojski <[hidden email]> wrote:
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?



--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane
Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

B.B.
In reply to this post by Arvid Heise-4
I am having a problem with sending code. So here it is. Hope this now looks ok

This is my main job (some parts of codes are abbreviated and this is the main part):

public class MyJob {

  private StreamExecutionEnvironment env;
  private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;

  public static void main(String[] args) throws Exception {

   var myJob = new MyJob();
   myJob.withExecutionEnvironment();
   myJob.init();
  }

  public void init() throws Exception {

    var settings = getApplicationProperties();

    SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =
        getSource(settings, settings.getProperty("my.topic"));

    DataStream<ProcessingResult<MyEvent>> mySource =
        env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);

    mySource.print();
    env.execute();
  }

  private static SourceFunction<ProcessingResult<MyEvent>> getSource(
      Properties settings, String topic) {

    var kafkaProps = getKafkaSourceProperties(settings);

    return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(), kafkaProps)
        .setStartFromEarliest()
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))
        .setCommitOffsetsOnCheckpoints(true);
  }
}

And this is myKafkaDeserializer:


public class MyKafkaDeserializer implements KafkaDeserializationSchema<ProcessingResult<MyEvent>> {

  private final ObjectMapper objectMapper;

  public MyKafkaDeserializer() {
    this.objectMapper = JsonUtils.objectMapper();
  }

  @Override
  public boolean isEndOfStream(ProcessingResult<MyEvent> processingResult) {
    return false;
  }

  @Override
  public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord)
      throws Exception {

    try {
      var event = objectMapper.readValue(consumerRecord.value(), MyEvent.class);
      var violation = ObjectValidator.of().getValidator().validate(event);

      if (!violation.isEmpty()) {
        return ProcessingResult.error(
            new ProcessingError(
                asTransformationError(
                    "constraint.violation", toCustomerAccountValidationErrorString(violation))));
      }

      return ProcessingResult.success(event);

    } catch (JsonProcessingException e) {
      return ProcessingResult.error(
          new ProcessingError(
              asTransformationError("constraint.violation", toStacktraceString(e))));
    }
  }

  @Override
  public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {
    return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>() {});
  }

  private static String toCustomerAccountValidationErrorString(
      Set<ConstraintViolation<MyEvent>> errors) {
    return errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));
  }




On Tue, May 25, 2021 at 5:51 PM Arvid Heise <[hidden email]> wrote:
Could you share your KafkaDeserializationSchema, we might be able to spot some optimization potential. You could also try out enableObjectReuse [1], which avoids copying data between tasks (not sure if you have any non-chained tasks).

If you are on 1.13, you could check out the flamegraph to see where the bottleneck occurs. [2]


On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?





--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane
Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

Arvid Heise-4
Hi,

The implementation looks good. I'd probably cache the ObjectValidator.of().getValidator() in a field to be sure that it's not a pricey construction.
Did you evaluate what happens when you skip the validation entirely in terms of records/s?

On Thu, May 27, 2021 at 11:18 AM B.B. <[hidden email]> wrote:
I am having a problem with sending code. So here it is. Hope this now looks ok

This is my main job (some parts of codes are abbreviated and this is the main part):

public class MyJob {

  private StreamExecutionEnvironment env;
  private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;

  public static void main(String[] args) throws Exception {

   var myJob = new MyJob();
   myJob.withExecutionEnvironment();
   myJob.init();
  }

  public void init() throws Exception {

    var settings = getApplicationProperties();

    SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =
        getSource(settings, settings.getProperty("my.topic"));

    DataStream<ProcessingResult<MyEvent>> mySource =
        env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);

    mySource.print();
    env.execute();
  }

  private static SourceFunction<ProcessingResult<MyEvent>> getSource(
      Properties settings, String topic) {

    var kafkaProps = getKafkaSourceProperties(settings);

    return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(), kafkaProps)
        .setStartFromEarliest()
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))
        .setCommitOffsetsOnCheckpoints(true);
  }
}

And this is myKafkaDeserializer:


public class MyKafkaDeserializer implements KafkaDeserializationSchema<ProcessingResult<MyEvent>> {

  private final ObjectMapper objectMapper;

  public MyKafkaDeserializer() {
    this.objectMapper = JsonUtils.objectMapper();
  }

  @Override
  public boolean isEndOfStream(ProcessingResult<MyEvent> processingResult) {
    return false;
  }

  @Override
  public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord)
      throws Exception {

    try {
      var event = objectMapper.readValue(consumerRecord.value(), MyEvent.class);
      var violation = ObjectValidator.of().getValidator().validate(event);

      if (!violation.isEmpty()) {
        return ProcessingResult.error(
            new ProcessingError(
                asTransformationError(
                    "constraint.violation", toCustomerAccountValidationErrorString(violation))));
      }

      return ProcessingResult.success(event);

    } catch (JsonProcessingException e) {
      return ProcessingResult.error(
          new ProcessingError(
              asTransformationError("constraint.violation", toStacktraceString(e))));
    }
  }

  @Override
  public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {
    return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>() {});
  }

  private static String toCustomerAccountValidationErrorString(
      Set<ConstraintViolation<MyEvent>> errors) {
    return errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));
  }




On Tue, May 25, 2021 at 5:51 PM Arvid Heise <[hidden email]> wrote:
Could you share your KafkaDeserializationSchema, we might be able to spot some optimization potential. You could also try out enableObjectReuse [1], which avoids copying data between tasks (not sure if you have any non-chained tasks).

If you are on 1.13, you could check out the flamegraph to see where the bottleneck occurs. [2]


On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?





--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane
Reply | Threaded
Open this post in threaded view
|

Re: Time needed to read from Kafka source

B.B.
OMG! Thank you! Thank you! I didn't think this could be a problem. When I removed validation the time needed to ingest all events reduced to 10min.

BR,
BB

On Thu, May 27, 2021 at 11:50 AM Arvid Heise <[hidden email]> wrote:
Hi,

The implementation looks good. I'd probably cache the ObjectValidator.of().getValidator() in a field to be sure that it's not a pricey construction.
Did you evaluate what happens when you skip the validation entirely in terms of records/s?

On Thu, May 27, 2021 at 11:18 AM B.B. <[hidden email]> wrote:
I am having a problem with sending code. So here it is. Hope this now looks ok

This is my main job (some parts of codes are abbreviated and this is the main part):

public class MyJob {

  private StreamExecutionEnvironment env;
  private static final Integer NUM_OF_PARALLEL_OPERATORS = 1;

  public static void main(String[] args) throws Exception {

   var myJob = new MyJob();
   myJob.withExecutionEnvironment();
   myJob.init();
  }

  public void init() throws Exception {

    var settings = getApplicationProperties();

    SourceFunction<ProcessingResult<MyEvent>> customerAccountSource =
        getSource(settings, settings.getProperty("my.topic"));

    DataStream<ProcessingResult<MyEvent>> mySource =
        env.addSource(customerAccountSource).setParallelism(NUM_OF_PARALLEL_OPERATORS);

    mySource.print();
    env.execute();
  }

  private static SourceFunction<ProcessingResult<MyEvent>> getSource(
      Properties settings, String topic) {

    var kafkaProps = getKafkaSourceProperties(settings);

    return new FlinkKafkaConsumer<>(topic, new MyKafkaDeserializer(), kafkaProps)
        .setStartFromEarliest()
        .assignTimestampsAndWatermarks(
            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(100)))
        .setCommitOffsetsOnCheckpoints(true);
  }
}

And this is myKafkaDeserializer:


public class MyKafkaDeserializer implements KafkaDeserializationSchema<ProcessingResult<MyEvent>> {

  private final ObjectMapper objectMapper;

  public MyKafkaDeserializer() {
    this.objectMapper = JsonUtils.objectMapper();
  }

  @Override
  public boolean isEndOfStream(ProcessingResult<MyEvent> processingResult) {
    return false;
  }

  @Override
  public ProcessingResult<MyEvent> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord)
      throws Exception {

    try {
      var event = objectMapper.readValue(consumerRecord.value(), MyEvent.class);
      var violation = ObjectValidator.of().getValidator().validate(event);

      if (!violation.isEmpty()) {
        return ProcessingResult.error(
            new ProcessingError(
                asTransformationError(
                    "constraint.violation", toCustomerAccountValidationErrorString(violation))));
      }

      return ProcessingResult.success(event);

    } catch (JsonProcessingException e) {
      return ProcessingResult.error(
          new ProcessingError(
              asTransformationError("constraint.violation", toStacktraceString(e))));
    }
  }

  @Override
  public TypeInformation<ProcessingResult<MyEvent>> getProducedType() {
    return TypeInformation.of(new TypeHint<ProcessingResult<MyEvent>>() {});
  }

  private static String toCustomerAccountValidationErrorString(
      Set<ConstraintViolation<MyEvent>> errors) {
    return errors.stream().map(ConstraintViolation::getMessage).collect(Collectors.joining(";"));
  }




On Tue, May 25, 2021 at 5:51 PM Arvid Heise <[hidden email]> wrote:
Could you share your KafkaDeserializationSchema, we might be able to spot some optimization potential. You could also try out enableObjectReuse [1], which avoids copying data between tasks (not sure if you have any non-chained tasks).

If you are on 1.13, you could check out the flamegraph to see where the bottleneck occurs. [2]


On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

That's a throughput of 700 records/second, which should be well below theoretical limits of any deserializer (from hundreds thousands up to tens of millions records/second/per single operator), unless your records are huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your problem. As always you have two options, either optimise/speed up your code/job, or scale up. 

If you choose the former, think about Flink as just another Java application. Check metrics and resource usage, and understand what resource is the problem (cpu? memory? machine is swapping? io?). You might be able to guess what's your bottleneck (reading from kafka? deserialisation? something else? Flink itself?) by looking at some of the metrics (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or you can also simplify your job to bare minimum and test performance of independent components. Also you can always attach a code profiler and simply look at what's happening. First identify what's the source of the bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes in the job graph based on busy/back pressured status and Flamegraph support) 

wt., 25 maj 2021 o 15:44 B.B. <[hidden email]> napisał(a):
Hi,

I am in the process of optimizing my job which at the moment by our thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1 cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and parallelism of two).

The main problem is one kafka source that has 3,8 million events that we have to process.
As a test we made a simple job that connects to kafka using a custom implementation of KafkaDeserializationSchema. There we are using ObjectMapper that mapps input values eg.

var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);

This is then validated with hibernate validator and output of this source is printed on the console.

The time needed for the job to consume all the events was one and a half hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?





--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane


--
Everybody wants to be a winner
Nobody wants to lose their game
Its insane for me
Its insane for you
Its insane