How to get past "bad" Kafka message, restart, keep state

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to get past "bad" Kafka message, restart, keep state

chrisr123
First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.

Use Case:
Use checkpointing.
Read a stream of integers from Kafka, keep a running sum.
If a "bad" Kafka message read, restart app, skip the "bad" message, keep
state.
My stream would something look like this:

set1,5
set1,7
set1,foobar
set1,6

I want my app to keep a running sum of the integers it has seen, and restart
if it crashes without losing state. so my running sum would be:
5,
12,
app crashes and restarts
18

However, I'm finding when my app restarts, it keeps reading the bad "foobar"
message and doesnt get past it. Source code below. The mapper bombs when I
try to parse "foobar" as an Integer.
How can I modify app to get past "poison" message?

env.enableCheckpointing(1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new
FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BROKERS);
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.setProperty("group.id", "consumerGroup1");

FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
new SimpleStringSchema(), properties);
DataStream<String> messageStream = env.addSource(kafkaConsumer);

DataStream<Tuple2&lt;String,Integer>> sums = messageStream
  .map(new NumberMapper())
  .keyBy(0)
  .sum(1);
  sums.print();


        private static class NumberMapper implements
MapFunction<String,Tuple2&lt;String,Integer>> {
                public Tuple2<String,Integer> map(String input) throws Exception {
                        return parseData(input);
                }
               
                private Tuple2<String,Integer> parseData(String record) {
                       
                        String[] tokens = record.toLowerCase().split(",");
                       
                        // Get Key
                        String key = tokens[0];
                       
                        // Get Integer Value
                        String integerValue = tokens[1];
                        System.out.println("Trying to Parse=" + integerValue);
                        Integer value = Integer.parseInt(integerValue);
                       
                        // Build TupleBoundedOutOfOrdernessGenerator
                        return new Tuple2<String,Integer>(key, value);
                }
               
        }




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re:How to get past "bad" Kafka message, restart, keep state

gerryzhou
Hi,

Flink will reset the kafka offset to the latest successful checkpoint when recovery, but the "bad" message will always raise exception and cause recovery, so it will never be covered by any successful checkpoint, and your job will never skip the record that "bad" message.

I think you may need to use the try-catch block to handle the exception in {{parseData(input)}} yourself, maybe as follow.

{code}
try {
    String[] tokens = record.toLowerCase().split(",");

    // Get Key
    String key = tokens[0];

    // Get Integer Value
    String integerValue = tokens[1];
    System.out.println("Trying to Parse=" + integerValue);
    Integer value = Integer.parseInt(integerValue);

    // Build TupleBoundedOutOfOrdernessGenerator
    return new Tuple2<String,Integer>(key, value);
} catch(...) {
    return new Tuple2<String, Integer>(key, 0);
}
{code}

Best, Sihua
On 06/20/2018 08:57[hidden email] wrote:
First time I'm trying to get this to work so bear with me. I'm trying to
learn checkpointing with Kafka and handling "bad" messages, restarting
without losing state.

Use Case:
Use checkpointing.
Read a stream of integers from Kafka, keep a running sum.
If a "bad" Kafka message read, restart app, skip the "bad" message, keep
state.
My stream would something look like this:

set1,5
set1,7
set1,foobar
set1,6

I want my app to keep a running sum of the integers it has seen, and restart
if it crashes without losing state. so my running sum would be:
5,
12,
app crashes and restarts
18

However, I'm finding when my app restarts, it keeps reading the bad "foobar"
message and doesnt get past it. Source code below. The mapper bombs when I
try to parse "foobar" as an Integer.
How can I modify app to get past "poison" message?

env.enableCheckpointing(1000L);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
env.getCheckpointConfig().setCheckpointTimeout(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setStateBackend(new
FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BROKERS);
properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
properties.setProperty("group.id", "consumerGroup1");

FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
new SimpleStringSchema(), properties);
DataStream<String> messageStream = env.addSource(kafkaConsumer);

DataStream<Tuple2&lt;String,Integer>> sums = messageStream
.map(new NumberMapper())
.keyBy(0)
.sum(1);    
sums.print();


private static class NumberMapper implements
MapFunction<String,Tuple2&lt;String,Integer>> {
public Tuple2<String,Integer> map(String input) throws Exception {
return parseData(input);
}

private Tuple2<String,Integer> parseData(String record) {

String[] tokens = record.toLowerCase().split(",");

// Get Key
String key = tokens[0];

// Get Integer Value
String integerValue = tokens[1];
System.out.println("Trying to Parse=" + integerValue);
Integer value = Integer.parseInt(integerValue);

// Build TupleBoundedOutOfOrdernessGenerator
return new Tuple2<String,Integer>(key, value);
}

}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to get past "bad" Kafka message, restart, keep state

Kien Truong
In reply to this post by chrisr123
Hi,

You can use FlatMap instead of Map, and only collect valid elements.


Regards,

Kien


On 6/20/2018 7:57 AM, chrisr123 wrote:

> First time I'm trying to get this to work so bear with me. I'm trying to
> learn checkpointing with Kafka and handling "bad" messages, restarting
> without losing state.
>
> Use Case:
> Use checkpointing.
> Read a stream of integers from Kafka, keep a running sum.
> If a "bad" Kafka message read, restart app, skip the "bad" message, keep
> state.
> My stream would something look like this:
>
> set1,5
> set1,7
> set1,foobar
> set1,6
>
> I want my app to keep a running sum of the integers it has seen, and restart
> if it crashes without losing state. so my running sum would be:
> 5,
> 12,
> app crashes and restarts
> 18
>
> However, I'm finding when my app restarts, it keeps reading the bad "foobar"
> message and doesnt get past it. Source code below. The mapper bombs when I
> try to parse "foobar" as an Integer.
> How can I modify app to get past "poison" message?
>
> env.enableCheckpointing(1000L);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
> env.getCheckpointConfig().setCheckpointTimeout(10000);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.setStateBackend(new
> FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", BROKERS);
> properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
> properties.setProperty("group.id", "consumerGroup1");
>
> FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
> new SimpleStringSchema(), properties);
> DataStream<String> messageStream = env.addSource(kafkaConsumer);
>
> DataStream<Tuple2&lt;String,Integer>> sums = messageStream
>    .map(new NumberMapper())
>    .keyBy(0)
>    .sum(1);
>    sums.print();
>
>
> private static class NumberMapper implements
> MapFunction<String,Tuple2&lt;String,Integer>> {
> public Tuple2<String,Integer> map(String input) throws Exception {
> return parseData(input);
> }
>
> private Tuple2<String,Integer> parseData(String record) {
>
> String[] tokens = record.toLowerCase().split(",");
>
> // Get Key
> String key = tokens[0];
>
> // Get Integer Value
> String integerValue = tokens[1];
> System.out.println("Trying to Parse=" + integerValue);
> Integer value = Integer.parseInt(integerValue);
>
> // Build TupleBoundedOutOfOrdernessGenerator
> return new Tuple2<String,Integer>(key, value);
> }
>
> }
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: How to get past "bad" Kafka message, restart, keep state

Tzu-Li (Gordon) Tai
Hi,

You can “skip” the corrupted message by returning `null` from the deserialize method on the user-provided DeserializationSchema.
This lets the Kafka connector consider the record as processed, advances the offset, but doesn’t emit anything downstream for it.

Hope this helps!

Cheers,
Gordon

On 20 June 2018 at 3:59:47 PM, Kien Truong ([hidden email]) wrote:

Hi,

You can use FlatMap instead of Map, and only collect valid elements.


Regards,

Kien


On 6/20/2018 7:57 AM, chrisr123 wrote:

> First time I'm trying to get this to work so bear with me. I'm trying to
> learn checkpointing with Kafka and handling "bad" messages, restarting
> without losing state.
>
> Use Case:
> Use checkpointing.
> Read a stream of integers from Kafka, keep a running sum.
> If a "bad" Kafka message read, restart app, skip the "bad" message, keep
> state.
> My stream would something look like this:
>
> set1,5
> set1,7
> set1,foobar
> set1,6
>
> I want my app to keep a running sum of the integers it has seen, and restart
> if it crashes without losing state. so my running sum would be:
> 5,
> 12,
> app crashes and restarts
> 18
>
> However, I'm finding when my app restarts, it keeps reading the bad "foobar"
> message and doesnt get past it. Source code below. The mapper bombs when I
> try to parse "foobar" as an Integer.
> How can I modify app to get past "poison" message?
>
> env.enableCheckpointing(1000L);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500L);
> env.getCheckpointConfig().setCheckpointTimeout(10000);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> env.setStateBackend(new
> FsStateBackend("hdfs://mymachine:9000/flink/checkpoints"));
>
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", BROKERS);
> properties.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
> properties.setProperty("group.id", "consumerGroup1");
>
> FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>(topicName,
> new SimpleStringSchema(), properties);
> DataStream<String> messageStream = env.addSource(kafkaConsumer);
>
> DataStream<Tuple2&lt;String,Integer>> sums = messageStream
> .map(new NumberMapper())
> .keyBy(0)
> .sum(1);
> sums.print();
>
>
> private static class NumberMapper implements
> MapFunction<String,Tuple2&lt;String,Integer>> {
> public Tuple2<String,Integer> map(String input) throws Exception {
> return parseData(input);
> }
>
> private Tuple2<String,Integer> parseData(String record) {
>
> String[] tokens = record.toLowerCase().split(",");
>
> // Get Key
> String key = tokens[0];
>
> // Get Integer Value
> String integerValue = tokens[1];
> System.out.println("Trying to Parse=" + integerValue);
> Integer value = Integer.parseInt(integerValue);
>
> // Build TupleBoundedOutOfOrdernessGenerator
> return new Tuple2<String,Integer>(key, value);
> }
>
> }
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/