Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

Somya Maithani
Hey Team,

Problem
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1 and we noticed that a week old offset was consumed from Kafka even though the configuration says latest.

Pretext
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

Release Plan
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but after performing step 2 we noticed that the service was consuming one week old Kafka messages from the source topic. We did not see any exceptions but since the number of messages consumed increased a lot for our Flink infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we did upgrade the Flink dependencies for Kafka.

Old dependency:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>


New dependency:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
 

Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd
Reply | Threaded
Open this post in threaded view
|

Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

Somya Maithani
Hey,

Any ideas about this? We are blocked on the upgrade because we want async timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <[hidden email]> wrote:
Hey Team,

Problem
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1 and we noticed that a week old offset was consumed from Kafka even though the configuration says latest.

Pretext
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

Release Plan
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but after performing step 2 we noticed that the service was consuming one week old Kafka messages from the source topic. We did not see any exceptions but since the number of messages consumed increased a lot for our Flink infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we did upgrade the Flink dependencies for Kafka.

Old dependency:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>


New dependency:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
 

Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd
Reply | Threaded
Open this post in threaded view
|

Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

Chesnay Schepler
@gordon Do you remember whether we changed any behavior of the Kafka 0.10 consumer after 1.3.3?

On 23/01/2020 12:02, Somya Maithani wrote:
Hey,

Any ideas about this? We are blocked on the upgrade because we want async timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <[hidden email]> wrote:
Hey Team,

Problem
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1 and we noticed that a week old offset was consumed from Kafka even though the configuration says latest.

Pretext
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

Release Plan
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but after performing step 2 we noticed that the service was consuming one week old Kafka messages from the source topic. We did not see any exceptions but since the number of messages consumed increased a lot for our Flink infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we did upgrade the Flink dependencies for Kafka.

Old dependency:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>


New dependency:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
 

Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


Reply | Threaded
Open this post in threaded view
|

Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

Tzu-Li (Gordon) Tai
Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler <[hidden email]> wrote:
@gordon Do you remember whether we changed any behavior of the Kafka 0.10 consumer after 1.3.3?

On 23/01/2020 12:02, Somya Maithani wrote:
Hey,

Any ideas about this? We are blocked on the upgrade because we want async timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <[hidden email]> wrote:
Hey Team,

Problem
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1 and we noticed that a week old offset was consumed from Kafka even though the configuration says latest.

Pretext
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

Release Plan
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but after performing step 2 we noticed that the service was consuming one week old Kafka messages from the source topic. We did not see any exceptions but since the number of messages consumed increased a lot for our Flink infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we did upgrade the Flink dependencies for Kafka.

Old dependency:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>


New dependency:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
 

Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


Reply | Threaded
Open this post in threaded view
|

Re: Old offsets consumed from Kafka after Flink upgrade to 1.9.1 (from 1.2.1)

Tzu-Li (Gordon) Tai
Update:
I can confirm my previous guess based on the changes in https://issues.apache.org/jira/browse/FLINK-4280 that was merged for Flink 1.3.0.
When upgrading from Flink 1.2.x -> 1.3.0, the new startup position configurations were respected over the checkpointed offsets (only once for the first restore after upgrade).
After that, all restores from savepoints would only ever respect the checkpointed offsets (regardless of whether or not it was the first restore after upgrade).
This would explain the behaviour you encountered.

If you actually prefer to not have your Kafka consumer progress carried over after the upgrade and want to just start consuming from the latest offset,
one way to achieve that is to assign a new uid to the Kafka consumer operator, and allow non-restored state when restoring.
With this change, Flink should consider the Kafka consumer operator to not have any prior snapshotted state (i.e. offsets) and respect the startup configuration.

Let me know if this works for you!

Cheers,
Gordon

On Thu, Jan 23, 2020 at 9:12 PM Tzu-Li (Gordon) Tai <[hidden email]> wrote:
Hi Somya,

I'll have to take a closer look at the JIRA history to refresh my memory on potential past changes that caused this.

My first suspection is this:
It is expected that the Kafka consumer will *ignore* the configured startup position if the job was restored from a savepoint.
It will always use the offsets that were persisted at the time of the savepoint.
Would this probably already explain what you are seeing?

What I'm not sure of yet is whether this was a behavioural change that occurred between versions 1.2.x and 1.3.x or later versions.
I'll take a closer look once I'm back from travelling tomorrow and get back to you on that.

Cheers,
Gordon

On Thu, Jan 23, 2020, 7:52 PM Chesnay Schepler <[hidden email]> wrote:
@gordon Do you remember whether we changed any behavior of the Kafka 0.10 consumer after 1.3.3?

On 23/01/2020 12:02, Somya Maithani wrote:
Hey,

Any ideas about this? We are blocked on the upgrade because we want async timer checkpointing.

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd


On Fri, Jan 17, 2020 at 10:37 AM Somya Maithani <[hidden email]> wrote:
Hey Team,

Problem
Recently, we were trying to upgrade Flink infrastructure to version 1.9.1 and we noticed that a week old offset was consumed from Kafka even though the configuration says latest.

Pretext
1. Our current Flink version in production is 1.2.1.
2. We use RocksDB + Hadoop as our backend / checkpointing data store.
3. We consume and produce messages to / from Kafka.

Release Plan
1. Upgrade Flink 1.2.1 to 1.3.
2. Upgrade Flink 1.3.3 to 1.9.1
Note: We have a transitioning version (1.3.3) because of the serialisation change in checkpointing.

After performing step 1, the service was consuming latest Kafka events but after performing step 2 we noticed that the service was consuming one week old Kafka messages from the source topic. We did not see any exceptions but since the number of messages consumed increased a lot for our Flink infrastructure, our task managers started crashing eventually.

We did not change Kafka configuration in the service for the upgrade but we did upgrade the Flink dependencies for Kafka.

Old dependency:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.10</artifactId>
      <version>${flink.version}</version>
    </dependency>


New dependency:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
      <version>${flink.version}</version>
    </dependency>
 

Do we know why this would be happening?

Regards,

Somya Maithani
Software Developer II
Helpshift Pvt Ltd