Data loss when restoring from savepoint

classic Classic list List threaded Threaded
70 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

Data loss when restoring from savepoint

Juho Autio
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!
Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!

Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!


Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "<a href="s3n://bucket/savepoints" class="">s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "<a href="s3://bucket/output" class="">s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!



Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!




Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!





Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!






Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!







Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!








Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!









Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!










Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!











Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!












Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!













Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, it was known from the beginning, that not everything is lost before/after restoring a savepoint, just some records around the time of restoration. It's not 100% clear whether records are lost before making a savepoint or after restoring it. Although, based on the new DEBUG logs it seems more like losing some records that are seen ~soon after restoring. It seems like Flink would be somehow confused either about the restored state vs. new inserts to state. This could also be somehow linked to the high back pressure on the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to BucketingSink before the window closes, so I don't see how it would make any difference if we replace the BucketingSink with a map function or another sink type. We don't create or restore savepoints during the time when BucketingSink gets input or has open buckets – that happens at a much later time of day. I would focus on figuring out why the records are lost while the window is open. But I don't know how to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!














Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink & BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <[hidden email]> wrote:
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, it was known from the beginning, that not everything is lost before/after restoring a savepoint, just some records around the time of restoration. It's not 100% clear whether records are lost before making a savepoint or after restoring it. Although, based on the new DEBUG logs it seems more like losing some records that are seen ~soon after restoring. It seems like Flink would be somehow confused either about the restored state vs. new inserts to state. This could also be somehow linked to the high back pressure on the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to BucketingSink before the window closes, so I don't see how it would make any difference if we replace the BucketingSink with a map function or another sink type. We don't create or restore savepoints during the time when BucketingSink gets input or has open buckets – that happens at a much later time of day. I would focus on figuring out why the records are lost while the window is open. But I don't know how to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!















Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Sorry to insist, but we seem to be blocked for any serious usage of state in Flink if we can't rely on it to not miss data in case of restore.

Would anyone have suggestions for how to troubleshoot this? So far I have verified with DEBUG logs that our reduce function gets to process also the data that is missing from window output.

On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <[hidden email]> wrote:
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink & BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <[hidden email]> wrote:
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, it was known from the beginning, that not everything is lost before/after restoring a savepoint, just some records around the time of restoration. It's not 100% clear whether records are lost before making a savepoint or after restoring it. Although, based on the new DEBUG logs it seems more like losing some records that are seen ~soon after restoring. It seems like Flink would be somehow confused either about the restored state vs. new inserts to state. This could also be somehow linked to the high back pressure on the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to BucketingSink before the window closes, so I don't see how it would make any difference if we replace the BucketingSink with a map function or another sink type. We don't create or restore savepoints during the time when BucketingSink gets input or has open buckets – that happens at a much later time of day. I would focus on figuring out why the records are lost while the window is open. But I don't know how to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!
















Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Andrey Zagrebin
Hi Juho,

can you try to reduce the job to minimal reproducible example and share the job and input?

For example:
- some simple records as input, e.g. tuples of primitive types saved as cvs
- minimal deduplication job which processes them and misses records
- check if it happens for shorter windows, like 1h etc
- setup which you use for the job, ideally locally reproducible or cloud

Best,
Andrey

On 4 Oct 2018, at 11:13, Juho Autio <[hidden email]> wrote:

Sorry to insist, but we seem to be blocked for any serious usage of state in Flink if we can't rely on it to not miss data in case of restore.

Would anyone have suggestions for how to troubleshoot this? So far I have verified with DEBUG logs that our reduce function gets to process also the data that is missing from window output.

On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <[hidden email]> wrote:
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink & BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <[hidden email]> wrote:
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, it was known from the beginning, that not everything is lost before/after restoring a savepoint, just some records around the time of restoration. It's not 100% clear whether records are lost before making a savepoint or after restoring it. Although, based on the new DEBUG logs it seems more like losing some records that are seen ~soon after restoring. It seems like Flink would be somehow confused either about the restored state vs. new inserts to state. This could also be somehow linked to the high back pressure on the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to BucketingSink before the window closes, so I don't see how it would make any difference if we replace the BucketingSink with a map function or another sink type. We don't create or restore savepoints during the time when BucketingSink gets input or has open buckets – that happens at a much later time of day. I would focus on figuring out why the records are lost while the window is open. But I don't know how to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!

















Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Till Rohrmann
Hi Juho,

another idea to further narrow down the problem could be to simplify the job to not use a reduce window but simply a time window which outputs the window events. Then counting the input and output events should allow you to verify the results. If you are not seeing missing events, then it could have something to do with the reducing state used in the reduce function.

In general, it would be tremendously helpful to have a minimal working example which allows to reproduce the problem.

Cheers,
Till

On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

can you try to reduce the job to minimal reproducible example and share the job and input?

For example:
- some simple records as input, e.g. tuples of primitive types saved as cvs
- minimal deduplication job which processes them and misses records
- check if it happens for shorter windows, like 1h etc
- setup which you use for the job, ideally locally reproducible or cloud

Best,
Andrey

On 4 Oct 2018, at 11:13, Juho Autio <[hidden email]> wrote:

Sorry to insist, but we seem to be blocked for any serious usage of state in Flink if we can't rely on it to not miss data in case of restore.

Would anyone have suggestions for how to troubleshoot this? So far I have verified with DEBUG logs that our reduce function gets to process also the data that is missing from window output.

On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <[hidden email]> wrote:
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink & BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <[hidden email]> wrote:
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, it was known from the beginning, that not everything is lost before/after restoring a savepoint, just some records around the time of restoration. It's not 100% clear whether records are lost before making a savepoint or after restoring it. Although, based on the new DEBUG logs it seems more like losing some records that are seen ~soon after restoring. It seems like Flink would be somehow confused either about the restored state vs. new inserts to state. This could also be somehow linked to the high back pressure on the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to BucketingSink before the window closes, so I don't see how it would make any difference if we replace the BucketingSink with a map function or another sink type. We don't create or restore savepoints during the time when BucketingSink gets input or has open buckets – that happens at a much later time of day. I would focus on figuring out why the records are lost while the window is open. But I don't know how to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!

















Reply | Threaded
Open this post in threaded view
|

Re: Data loss when restoring from savepoint

Juho Autio
Thanks for the suggestions!

> In general, it would be tremendously helpful to have a minimal working example which allows to reproduce the problem.

Definitely. The problem with reproducing has been that this only seems to happen in the bigger production data volumes.

That's why I'm hoping to find a way to debug this with the production data. With that it seems to consistently cause some misses every time the job is killed/restored.

> check if it happens for shorter windows, like 1h etc

What would be the benefit of that compared to 24h window?

> simplify the job to not use a reduce window but simply a time window which outputs the window events. Then counting the input and output events should allow you to verify the results. If you are not seeing missing events, then it could have something to do with the reducing state used in the reduce function.

Hm, maybe, but not sure how useful that would be, because it wouldn't yet prove that it's related to reducing, because not having a reduce function could also mean smaller load on the job, which might alone be enough to make the problem not manifest.

Is there a way to debug what goes into the reducing state (including what gets removed or overwritten and what restored), if that makes sense..? Maybe some suitable logging could be used to prove that the lost data is written to the reducing state (or at least asked to be written), but not found any more when the window closes and state is flushed?

On configuration once more, we're using RocksDB state backend with asynchronous incremental checkpointing. The state is restored from savepoints though, we haven't been using those checkpoints in these tests (although they could be used in case of crashes – but we haven't had those now).

On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <[hidden email]> wrote:
Hi Juho,

another idea to further narrow down the problem could be to simplify the job to not use a reduce window but simply a time window which outputs the window events. Then counting the input and output events should allow you to verify the results. If you are not seeing missing events, then it could have something to do with the reducing state used in the reduce function.

In general, it would be tremendously helpful to have a minimal working example which allows to reproduce the problem.

Cheers,
Till

On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

can you try to reduce the job to minimal reproducible example and share the job and input?

For example:
- some simple records as input, e.g. tuples of primitive types saved as cvs
- minimal deduplication job which processes them and misses records
- check if it happens for shorter windows, like 1h etc
- setup which you use for the job, ideally locally reproducible or cloud

Best,
Andrey

On 4 Oct 2018, at 11:13, Juho Autio <[hidden email]> wrote:

Sorry to insist, but we seem to be blocked for any serious usage of state in Flink if we can't rely on it to not miss data in case of restore.

Would anyone have suggestions for how to troubleshoot this? So far I have verified with DEBUG logs that our reduce function gets to process also the data that is missing from window output.

On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <[hidden email]> wrote:
Hi Andrey,

To rule out for good any questions about sink behaviour, the job was killed and started with an additional Kafka sink.

The same number of ids were missed in both outputs: KafkaSink & BucketingSink.

I wonder what would be the next steps in debugging?

On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <[hidden email]> wrote:
Thanks, Andrey.

> so it means that the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, it was known from the beginning, that not everything is lost before/after restoring a savepoint, just some records around the time of restoration. It's not 100% clear whether records are lost before making a savepoint or after restoring it. Although, based on the new DEBUG logs it seems more like losing some records that are seen ~soon after restoring. It seems like Flink would be somehow confused either about the restored state vs. new inserts to state. This could also be somehow linked to the high back pressure on the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? Nothing is sent to BucketingSink before the window closes, so I don't see how it would make any difference if we replace the BucketingSink with a map function or another sink type. We don't create or restore savepoints during the time when BucketingSink gets input or has open buckets – that happens at a much later time of day. I would focus on figuring out why the records are lost while the window is open. But I don't know how to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

so it means that the savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map function after reduce and before sink. 
The map function should be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
This should allow to check whether the processed distinct records were buffered in the state after the restoration from the savepoint or not. If they were buffered we should see that there was an attempt to write them to the sink from the state.

Another suggestion is to try to write records to some other sink or to both. 
E.g. if you can access file system of workers, maybe just into local files and check whether the records are also dropped there.

Best,
Andrey

On 20 Sep 2018, at 15:37, Juho Autio <[hidden email]> wrote:

Hi Andrey!

I was finally able to gather the DEBUG logs that you suggested. In short, the reducer logged that it processed at least some of the ids that were missing from the output.

"At least some", because I didn't have the job running with DEBUG logs for the full 24-hour window period. So I was only able to look up if I can find some of the missing ids in the DEBUG logs. Which I did indeed.

I changed the DistinctFunction.java to do this:

    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"), value1.get("id"));
        return value1;
    }

Then:

vi flink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG

Then I ran the following kind of test:

- Cancelled the on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
- Started a new cluster & job with DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
- Ran until caught up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint, let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when the 24-hour window closed, I compared the results again with a batch version's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual value with AN12345 below), which was not found in the stream output, but was found in batch output & flink DEBUG logs.

Related to that id, I gathered the following information:

2018-09-18~09:13:21,000 job started & savepoint is restored

2018-09-18 09:14:29,085 missing id is processed for the first time, proved by this log line:
2018-09-18 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction                  - DistinctFunction.reduce returns: s.aid1=AN12345

2018-09-18 09:15:14,264 first synchronous part of checkpoint
2018-09-18 09:15:16,544 first asynchronous part of checkpoint

(
more occurrences of checkpoints (~1 min checkpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

2018-09-18 09:23:45,053 missing id is processed for the last time

2018-09-18~10:20:00,000 savepoint created & job cancelled

To be noted, there was high backpressure after restoring from savepoint until the stream caught up with the kafka offsets. Although, our job uses assign timestamps & watermarks on the flink kafka consumer itself, so event time of all partitions is synchronized. As expected, we don't get any late data in the late data side output.

From this we can see that the missing ids are processed by the reducer, but they must get lost somewhere before the 24-hour window is triggered.

I think it's worth mentioning once more that the stream doesn't miss any ids if we let it's running without interruptions / state restoring.

What's next?

On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle). The fact, that it is always around the time of taking a savepoint and not random, is surely suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile). My understanding is that 'eventual consistency’ means that even if you just created file (its name is key) it can be that you do not get it in the list or exists (HEAD) returns false and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey


On 29 Aug 2018, at 12:11, Juho Autio <[hidden email]> wrote:

Andrey, thank you very much for the debugging suggestions, I'll try them.

In the meanwhile two more questions, please:

> Just to keep in mind this problem with s3 and exclude it for sure. I would also check whether the size of missing events is around the batch size of BucketingSink or not.

Fair enough, but I also want to focus on debugging the most probable subject first. So what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either. Isn't this true, or have I totally missed how Flink works in triggering window results? I would not expect there to be any optimization that speculatively triggers early results of a regular time window to the downstream operators.

> The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it.

I was wondering, what does S3's "read-after-write consistency" (mentioned on the page you linked) actually mean. It seems that this might be possible:
- LIST keys, find current max index
- choose next index = max + 1
- HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3

But definitely sounds easier if a sink keeps track of files in a way that's guaranteed to be consistent.

Cheers,
Juho

On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <[hidden email]> wrote:
Hi,

true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for confusion.
The old BucketingSink has in general problem with s3. Internally BucketingSink queries s3 as a file system 
to list already written file parts (batches) and determine index of the next part to start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file system. 
I also include Kostas, he might add more details. 

Just to keep in mind this problem with s3 and exclude it for sure  I would also check whether the size of missing events is around the batch size of BucketingSink or not. You also wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if it is not yet for sure I would also check it.

Have you already checked the log files of job manager and task managers for the job running before and after the restore from the check point? Is everything successful there, no errors, relevant warnings or exceptions?

As the next step, I would suggest to log all encountered events in DistinctFunction.reduce if possible for production data and check whether the missed events are eventually processed before or after the savepoint. The following log message indicates a border between the events that should be included into the savepoint (logged before) or not:
“{} ({}, synchronous part) in thread {} took {} ms” (template)
Also check if the savepoint has been overall completed:
"{} ({}, asynchronous part) in thread {} took {} ms."

Best,
Andrey


On 24 Aug 2018, at 20:41, Juho Autio <[hidden email]> wrote:

Hi,

Using StreamingFileSink is not a convenient option for production use for us as it doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point in doing so. Please consider my previous comment:

> I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I could also use a kafka sink instead, but I can't imagine how there could be any difference. It's very real that the sink doesn't get any input for a long time until the 24-hour window closes, and then it quickly writes out everything because it's not that much data eventually for the distinct values.

Any ideas for debugging what's happening around the savepoint & restoration time?

*) I actually implemented StreamingFileSink as an alternative sink. This was before I came to realize that most likely the sink component has nothing to do with the data loss problem. I tried it with s3n:// path just to see an exception being thrown. In the source code I indeed then found an explicit check for the target path scheme to be "hdfs://". 

On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <[hidden email]> wrote:
Ok, I think before further debugging the window reduced state, 
could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous 'BucketingSink’?

Cheers,
Andrey


On 24 Aug 2018, at 18:03, Juho Autio <[hidden email]> wrote:

Yes, sorry for my confusing comment. I just meant that it seems like there's a bug somewhere now that the output is missing some data.

> I would wait and check the actual output in s3 because it is the main result of the job

Yes, and that's what I have already done. There seems to be always some data loss with the production data volumes, if the job has been restarted on that day.

Would you have any suggestions for how to debug this further?

Many thanks for stepping in.

On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

So it is a per key deduplication job.

Yes, I would wait and check the actual output in s3 because it is the main result of the job and

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

is not a bug, it is a possible behaviour.

The savepoint is a snapshot of the data in transient which is already consumed from Kafka.
Basically the full contents of the window result is split between the savepoint and what can come after the savepoint'ed offset in Kafka but before the window result is written into s3. 

Allowed lateness should not affect it, I am just saying that the final result in s3 should include all records after it. 
This is what should be guaranteed but not the contents of the intermediate savepoint.

Cheers,
Andrey

On 24 Aug 2018, at 16:52, Juho Autio <[hidden email]> wrote:

Thanks for your answer!

I check for the missed data from the final output on s3. So I wait until the next day, then run the same thing re-implemented in batch, and compare the output.

> The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka.

Yes, I would definitely expect that. It seems like there's a bug somewhere.

> Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Well, as far as I know, allowed lateness doesn't play any role here, because I started running the job with allowedLateness=0, and still get the data loss, while my late data output doesn't receive anything.

> Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Yes, it's the actual implementation. Note that there's a keyBy before the DistinctFunction. So there's one record for each key (which is the combination of a couple of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore, and the total output is obviously much more than that.

Here's the full code for the key selector:

public class MapKeySelector implements KeySelector<Map<String,String>, Object> {

    private final String[] fields;

    public MapKeySelector(String... fields) {
        this.fields = fields;
    }

    @Override
    public Object getKey(Map<String, String> event) throws Exception {
        Tuple key = Tuple.getTupleClass(fields.length).newInstance();
        for (int i = 0; i < fields.length; i++) {
            key.setField(event.getOrDefault(fields[i], ""), i);
        }
        return key;
    }
}

And a more exact example on how it's used:

                .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
                .timeWindow(Time.days(1))
                .reduce(new DistinctFunction())

On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <[hidden email]> wrote:
Hi Juho,

Where exactly does the data miss? When do you notice that? 
Do you check it:
- debugging `DistinctFunction.reduce` right after resume in the middle of the day 
or 
- some distinct records miss in the final output of BucketingSink in s3 after window result is actually triggered and saved into s3 at the end of the day? is this the main output?

The late data around the time of taking savepoint might be not included into the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just come later after the restore and should be reduced within the allowed lateness into the final result which is saved into s3.

Also, is this `DistinctFunction.reduce` just an example or the actual implementation, basically saving just one of records inside the 24h window in s3? then what is missing there?

Cheers,
Andrey

On 23 Aug 2018, at 15:42, Juho Autio <[hidden email]> wrote:

I changed to allowedLateness=0, no change, still missing data when restoring from savepoint.

On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <[hidden email]> wrote:
I realized that BucketingSink must not play any role in this problem. This is because only when the 24-hour window triggers, BucketinSink gets a burst of input. Around the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything either (right?).

I will next try removing the allowedLateness entirely from the equation.

In the meanwhile, please let me know if you have any suggestions for debugging the lost data, for example what logs to enable.

We use FlinkKafkaConsumer010 btw. Are there any known issues with that, that could contribute to lost data when restoring a savepoint?

On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <[hidden email]> wrote:
Some data is silently lost on my Flink stream job when state is restored from a savepoint.

Do you have any debugging hints to find out where exactly the data gets dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any custom state management.

When I cancel the job with savepoint and restore from that savepoint, some data is missed. It seems to be losing just a small amount of data. The event time of lost data is probably around the time of savepoint. In other words the rest of the time window is not entirely missed – collection works correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller parallelism and smaller data volumes. But in production volumes the job seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created. It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1, Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)
                .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write late data to side output, but nothing is written there – if it would, it could explain missed data in the main output (I'm also sure that our late data writing works, because we previously had some actual late data which ended up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That would be just to rule out that Flink doesn't have a bug that's related to restoring state in combination with the allowedLateness feature. After all, all of our data should be in a good enough order to not be late, given the max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!


















1234