Developing Beam applications using Flink checkpoints

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Developing Beam applications using Flink checkpoints

Ivan San Jose
Hi, we are starting to use Beam with Flink as runner on our
applications, and recently we would like to get advantages that Flink
checkpoiting provides, but it seems we are not understanding it
clearly.

Simplifying, our application does the following:
  - Read meesages from a couple of Kafka topics
  - Combine them
  - Write combination result to a sink (Exasol DB)

As application is processing messages using event time, and one of the
topics is almost idle, the first time application is started messages
are stuck in the combiner because watermark don't advance until we have
messages arriving onto idled topic (we know this and is not a problem
for us though).

The problem is that we've observed, if a checkpoint is triggered when
messages are still stuck in the combiner, surprisingly for us, the
checkpoint finishes successfully (and offsets committed to Kafka) even
messages haven't progressed to the sink yet. Is this expected?

The thing is that, if in the future, we make not state compatible
changes in application source code, checkpoint taken couldn't be
restored. So we would like to start the application without using any
checkpoint but without losing data.
Problem here would be that data loss would happen because messages
stuck in combiner are already committed to Kafka and application would
start to read from latest commited offset in Kafka if we don't use any
checkpoint, thus those messages are not going to be read from the
source again.

So, I guess our question is how are you doing in order to not lose data
when developing applications, because sooner or later you are going to
add breaking changes...

For example, we've seen those two errors so far:
  - After changing an operator name:

2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
leadership with session id 00000000-0000-0000-0000-000000000000.
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
not set up JobManager
    at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRu
nner.java:152)
    at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea
teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
    at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR
unner$5(Dispatcher.java:375)
    at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check
edSupplier.java:34)
    ... 7 more
Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
Cannot map checkpoint/savepoint state for operator
f476451c6210bd2783f36fa331b9da5e to the new program, because the
operator is not available in the new program. If you want to allow to
skip this, you can set the --allowNonRestoredState option on the CLI.
    at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi
nt(Checkpoints.java:205)
...

  - After modifying a Java model class involved in a combine:
org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(
HeapKeyedStateBackendBuilder.java:116)
    at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta
teBackend(FsStateBackend.java:529)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial
izeState(AbstractStreamOperator.java:253)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str
eamTask.java:881)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j
ava:395)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local
class incompatible: stream classdesc serialVersionUID =
8366890161513008789, local class serialVersionUID = 174312384610985998
    at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)


Apologies in advance as we are new to Flink, so may be we are missing
something obvious here.

Thanks


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Arvid Heise-3
Hi Ivan,

First let's address the issue with idle partitions. The solution is to use a watermark assigner that also emits a watermark with some idle timeout [1].

Now the second question, on why Kafka commits are committed for in-flight, checkpointed data. The basic idea is that you are not losing data while avoiding replicated output.
So if you commit offsets only after data has been fully processed, upon crash the same data point would be reprocessed jointly with the restored in-flight data, so you get duplicate messages in your system.
To avoid duplicates data needs to be more or less completely flushed out the system before a checkpoint is performed. That would produce a huge downtime.
Instead, we assume that we can always resume from the checkpoints.

Which leads to the last question on what to do when your pipeline has breaking changes.
First strategy is to avoid breaking changes as much as possible. State could for example also be stored as Avro to allow schema evolution. Minor things like renamed operators will not happen with a bit more expertise.
Second strategy is to use state migration [2]. Alternatively, you can manually convert state with state processor API [3].
Last option is to do a full reprocessing of data. This can be done on a non-production cluster and then a savepoint can be used to bootstrap the production cluster quickly. This option needs to be available anyways for the case that you find any logic error. But of course, this option has the highest implications (may need to purge sink beforehand).


On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <[hidden email]> wrote:
Hi, we are starting to use Beam with Flink as runner on our
applications, and recently we would like to get advantages that Flink
checkpoiting provides, but it seems we are not understanding it
clearly.

Simplifying, our application does the following:
  - Read meesages from a couple of Kafka topics
  - Combine them
  - Write combination result to a sink (Exasol DB)

As application is processing messages using event time, and one of the
topics is almost idle, the first time application is started messages
are stuck in the combiner because watermark don't advance until we have
messages arriving onto idled topic (we know this and is not a problem
for us though).

The problem is that we've observed, if a checkpoint is triggered when
messages are still stuck in the combiner, surprisingly for us, the
checkpoint finishes successfully (and offsets committed to Kafka) even
messages haven't progressed to the sink yet. Is this expected?

The thing is that, if in the future, we make not state compatible
changes in application source code, checkpoint taken couldn't be
restored. So we would like to start the application without using any
checkpoint but without losing data.
Problem here would be that data loss would happen because messages
stuck in combiner are already committed to Kafka and application would
start to read from latest commited offset in Kafka if we don't use any
checkpoint, thus those messages are not going to be read from the
source again.

So, I guess our question is how are you doing in order to not lose data
when developing applications, because sooner or later you are going to
add breaking changes...

For example, we've seen those two errors so far:
  - After changing an operator name:

2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
entrypoint.
org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take
leadership with session id 00000000-0000-0000-0000-000000000000.
...
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could
not set up JobManager
    at
org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRu
nner.java:152)
    at
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea
teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
    at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR
unner$5(Dispatcher.java:375)
    at
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check
edSupplier.java:34)
    ... 7 more
Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
Cannot map checkpoint/savepoint state for operator
f476451c6210bd2783f36fa331b9da5e to the new program, because the
operator is not available in the new program. If you want to allow to
skip this, you can set the --allowNonRestoredState option on the CLI.
    at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi
nt(Checkpoints.java:205)
...

  - After modifying a Java model class involved in a combine:
org.apache.flink.runtime.state.BackendBuildingException: Failed when
trying to restore heap backend
    at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(
HeapKeyedStateBackendBuilder.java:116)
    at
org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta
teBackend(FsStateBackend.java:529)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem
ptCreateAndRestore(BackendRestorerProcedure.java:142)
    at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat
eAndRestore(BackendRestorerProcedure.java:121)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
    at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
    at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial
izeState(AbstractStreamOperator.java:253)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str
eamTask.java:881)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j
ava:395)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException:
internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local
class incompatible: stream classdesc serialVersionUID =
8366890161513008789, local class serialVersionUID = 174312384610985998
    at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)


Apologies in advance as we are new to Flink, so may be we are missing
something obvious here.

Thanks


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Ivan San Jose
Thanks for your complete answer Arvid, we will try to approach all
things you mentioned, but take into account we are using Beam on top of
Flink, so, to be honest, I don't know how could we implement the custom
serialization thing (
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
) there. Could you please give us some hints? Thanks

On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:

> Hi Ivan,
>
> First let's address the issue with idle partitions. The solution is
> to use a watermark assigner that also emits a watermark with some
> idle timeout [1].
>
> Now the second question, on why Kafka commits are committed for in-
> flight, checkpointed data. The basic idea is that you are not losing
> data while avoiding replicated output.
> So if you commit offsets only after data has been fully processed,
> upon crash the same data point would be reprocessed jointly with the
> restored in-flight data, so you get duplicate messages in your
> system.
> To avoid duplicates data needs to be more or less completely flushed
> out the system before a checkpoint is performed. That would produce a
> huge downtime.
> Instead, we assume that we can always resume from the checkpoints.
>
> Which leads to the last question on what to do when your pipeline has
> breaking changes.
> First strategy is to avoid breaking changes as much as possible.
> State could for example also be stored as Avro to allow schema
> evolution. Minor things like renamed operators will not happen with a
> bit more expertise.
> Second strategy is to use state migration [2]. Alternatively, you can
> manually convert state with state processor API [3].
> Last option is to do a full reprocessing of data. This can be done on
> a non-production cluster and then a savepoint can be used to
> bootstrap the production cluster quickly. This option needs to be
> available anyways for the case that you find any logic error. But of
> course, this option has the highest implications (may need to purge
> sink beforehand).
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> [hidden email]> wrote:
> > Hi, we are starting to use Beam with Flink as runner on our
> > applications, and recently we would like to get advantages that
> > Flink
> > checkpoiting provides, but it seems we are not understanding it
> > clearly.
> >
> > Simplifying, our application does the following:
> >   - Read meesages from a couple of Kafka topics
> >   - Combine them
> >   - Write combination result to a sink (Exasol DB)
> >
> > As application is processing messages using event time, and one of
> > the
> > topics is almost idle, the first time application is started
> > messages
> > are stuck in the combiner because watermark don't advance until we
> > have
> > messages arriving onto idled topic (we know this and is not a
> > problem
> > for us though).
> >
> > The problem is that we've observed, if a checkpoint is triggered
> > when
> > messages are still stuck in the combiner, surprisingly for us, the
> > checkpoint finishes successfully (and offsets committed to Kafka)
> > even
> > messages haven't progressed to the sink yet. Is this expected?
> >
> > The thing is that, if in the future, we make not state compatible
> > changes in application source code, checkpoint taken couldn't be
> > restored. So we would like to start the application without using
> > any
> > checkpoint but without losing data.
> > Problem here would be that data loss would happen because messages
> > stuck in combiner are already committed to Kafka and application
> > would
> > start to read from latest commited offset in Kafka if we don't use
> > any
> > checkpoint, thus those messages are not going to be read from the
> > source again.
> >
> > So, I guess our question is how are you doing in order to not lose
> > data
> > when developing applications, because sooner or later you are going
> > to
> > add breaking changes...
> >
> > For example, we've seen those two errors so far:
> >   - After changing an operator name:
> >
> > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
> > entrypoint.
> > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to
> > take
> > leadership with session id 00000000-0000-0000-0000-000000000000.
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > Could
> > not set up JobManager
> >     at
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag
> > erRu
> > nner.java:152)
> >     at
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.
> > crea
> > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> >     at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana
> > gerR
> > unner$5(Dispatcher.java:375)
> >     at
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C
> > heck
> > edSupplier.java:34)
> >     ... 7 more
> > Caused by: java.lang.IllegalStateException: Failed to rollback to
> > checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
> > savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
> > Cannot map checkpoint/savepoint state for operator
> > f476451c6210bd2783f36fa331b9da5e to the new program, because the
> > operator is not available in the new program. If you want to allow
> > to
> > skip this, you can set the --allowNonRestoredState option on the
> > CLI.
> >     at
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec
> > kpoi
> > nt(Checkpoints.java:205)
> > ...
> >
> >   - After modifying a Java model class involved in a combine:
> > org.apache.flink.runtime.state.BackendBuildingException: Failed
> > when
> > trying to restore heap backend
> >     at
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu
> > ild(
> > HeapKeyedStateBackendBuilder.java:116)
> >     at
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye
> > dSta
> > teBackend(FsStateBackend.java:529)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29
> > 1)
> >     at
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a
> > ttem
> > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> >     at
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c
> > reat
> > eAndRestore(BackendRestorerProcedure.java:121)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135
> > )
> >     at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini
> > tial
> > izeState(AbstractStreamOperator.java:253)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState
> > (Str
> > eamTask.java:881)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa
> > sk.j
> > ava:395)
> >     at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> >     at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.InvalidClassException:
> > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata;
> > local
> > class incompatible: stream classdesc serialVersionUID =
> > 8366890161513008789, local class serialVersionUID =
> > 174312384610985998
> >     at
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >
> >
> > Apologies in advance as we are new to Flink, so may be we are
> > missing
> > something obvious here.
> >
> > Thanks
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Arvid Heise-3
In reply to this post by Arvid Heise-3
Hi Ivan,

The easiest way is to use some implementation that's already there [1]. I already mentioned Avro and would strongly recommend giving it a go. If you make sure to provide a default value for as many fields as possible, you can always remove them later giving you great flexibility. I can give you more hints if you decide to go this route.

If you want to have a custom implementation, I'd start at looking of one of the simpler implementations like MapSerializerSnapshot [2].


On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <[hidden email]> wrote:
Thanks for your complete answer Arvid, we will try to approach all
things you mentioned, but take into account we are using Beam on top of
Flink, so, to be honest, I don't know how could we implement the custom
serialization thing (
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
) there. Could you please give us some hints? Thanks

On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> First let's address the issue with idle partitions. The solution is
> to use a watermark assigner that also emits a watermark with some
> idle timeout [1].
>
> Now the second question, on why Kafka commits are committed for in-
> flight, checkpointed data. The basic idea is that you are not losing
> data while avoiding replicated output.
> So if you commit offsets only after data has been fully processed,
> upon crash the same data point would be reprocessed jointly with the
> restored in-flight data, so you get duplicate messages in your
> system.
> To avoid duplicates data needs to be more or less completely flushed
> out the system before a checkpoint is performed. That would produce a
> huge downtime.
> Instead, we assume that we can always resume from the checkpoints.
>
> Which leads to the last question on what to do when your pipeline has
> breaking changes.
> First strategy is to avoid breaking changes as much as possible.
> State could for example also be stored as Avro to allow schema
> evolution. Minor things like renamed operators will not happen with a
> bit more expertise.
> Second strategy is to use state migration [2]. Alternatively, you can
> manually convert state with state processor API [3].
> Last option is to do a full reprocessing of data. This can be done on
> a non-production cluster and then a savepoint can be used to
> bootstrap the production cluster quickly. This option needs to be
> available anyways for the case that you find any logic error. But of
> course, this option has the highest implications (may need to purge
> sink beforehand).
>
> [1]
> https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>
> On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> [hidden email]> wrote:
> > Hi, we are starting to use Beam with Flink as runner on our
> > applications, and recently we would like to get advantages that
> > Flink
> > checkpoiting provides, but it seems we are not understanding it
> > clearly.
> >
> > Simplifying, our application does the following:
> >   - Read meesages from a couple of Kafka topics
> >   - Combine them
> >   - Write combination result to a sink (Exasol DB)
> >
> > As application is processing messages using event time, and one of
> > the
> > topics is almost idle, the first time application is started
> > messages
> > are stuck in the combiner because watermark don't advance until we
> > have
> > messages arriving onto idled topic (we know this and is not a
> > problem
> > for us though).
> >
> > The problem is that we've observed, if a checkpoint is triggered
> > when
> > messages are still stuck in the combiner, surprisingly for us, the
> > checkpoint finishes successfully (and offsets committed to Kafka)
> > even
> > messages haven't progressed to the sink yet. Is this expected?
> >
> > The thing is that, if in the future, we make not state compatible
> > changes in application source code, checkpoint taken couldn't be
> > restored. So we would like to start the application without using
> > any
> > checkpoint but without losing data.
> > Problem here would be that data loss would happen because messages
> > stuck in combiner are already committed to Kafka and application
> > would
> > start to read from latest commited offset in Kafka if we don't use
> > any
> > checkpoint, thus those messages are not going to be read from the
> > source again.
> >
> > So, I guess our question is how are you doing in order to not lose
> > data
> > when developing applications, because sooner or later you are going
> > to
> > add breaking changes...
> >
> > For example, we've seen those two errors so far:
> >   - After changing an operator name:
> >
> > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster
> > entrypoint.
> > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to
> > take
> > leadership with session id 00000000-0000-0000-0000-000000000000.
> > ...
> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > Could
> > not set up JobManager
> >     at
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag
> > erRu
> > nner.java:152)
> >     at
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.
> > crea
> > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> >     at
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana
> > gerR
> > unner$5(Dispatcher.java:375)
> >     at
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C
> > heck
> > edSupplier.java:34)
> >     ... 7 more
> > Caused by: java.lang.IllegalStateException: Failed to rollback to
> > checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink-
> > savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986.
> > Cannot map checkpoint/savepoint state for operator
> > f476451c6210bd2783f36fa331b9da5e to the new program, because the
> > operator is not available in the new program. If you want to allow
> > to
> > skip this, you can set the --allowNonRestoredState option on the
> > CLI.
> >     at
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec
> > kpoi
> > nt(Checkpoints.java:205)
> > ...
> >
> >   - After modifying a Java model class involved in a combine:
> > org.apache.flink.runtime.state.BackendBuildingException: Failed
> > when
> > trying to restore heap backend
> >     at
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu
> > ild(
> > HeapKeyedStateBackendBuilder.java:116)
> >     at
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye
> > dSta
> > teBackend(FsStateBackend.java:529)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29
> > 1)
> >     at
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a
> > ttem
> > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> >     at
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c
> > reat
> > eAndRestore(BackendRestorerProcedure.java:121)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> >     at
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > Impl
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135
> > )
> >     at
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini
> > tial
> > izeState(AbstractStreamOperator.java:253)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState
> > (Str
> > eamTask.java:881)
> >     at
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa
> > sk.j
> > ava:395)
> >     at
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> >     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> >     at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.io.InvalidClassException:
> > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata;
> > local
> > class incompatible: stream classdesc serialVersionUID =
> > 8366890161513008789, local class serialVersionUID =
> > 174312384610985998
> >     at
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >
> >
> > Apologies in advance as we are new to Flink, so may be we are
> > missing
> > something obvious here.
> >
> > Thanks
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Ivan San Jose
Yep, sorry if I'm bothering you but I think I'm still not getting this,
how could I tell Beam to tell Flink to use that serializer instead of
Java standard one, because I think Beam is abstracting us from Flink
checkpointing mechanism, so I'm afraid that if we use Flink API
directly we might break other things that Beam is hidding for us...

On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:

> Hi Ivan,
>
> The easiest way is to use some implementation that's already there
> [1]. I already mentioned Avro and would strongly recommend giving it
> a go. If you make sure to provide a default value for as many fields
> as possible, you can always remove them later giving you great
> flexibility. I can give you more hints if you decide to go this
> route.
>
> If you want to have a custom implementation, I'd start at looking of
> one of the simpler implementations like MapSerializerSnapshot [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> (see known implementing classes).
> [2]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
>
> On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> [hidden email]> wrote:
> > Thanks for your complete answer Arvid, we will try to approach all
> > things you mentioned, but take into account we are using Beam on
> > top of
> > Flink, so, to be honest, I don't know how could we implement the
> > custom
> > serialization thing (
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > ) there. Could you please give us some hints? Thanks
> >
> > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > Hi Ivan,
> > >
> > > First let's address the issue with idle partitions. The solution
> > is
> > > to use a watermark assigner that also emits a watermark with some
> > > idle timeout [1].
> > >
> > > Now the second question, on why Kafka commits are committed for
> > in-
> > > flight, checkpointed data. The basic idea is that you are not
> > losing
> > > data while avoiding replicated output.
> > > So if you commit offsets only after data has been fully
> > processed,
> > > upon crash the same data point would be reprocessed jointly with
> > the
> > > restored in-flight data, so you get duplicate messages in your
> > > system.
> > > To avoid duplicates data needs to be more or less completely
> > flushed
> > > out the system before a checkpoint is performed. That would
> > produce a
> > > huge downtime.
> > > Instead, we assume that we can always resume from the
> > checkpoints.
> > >
> > > Which leads to the last question on what to do when your pipeline
> > has
> > > breaking changes.
> > > First strategy is to avoid breaking changes as much as possible.
> > > State could for example also be stored as Avro to allow schema
> > > evolution. Minor things like renamed operators will not happen
> > with a
> > > bit more expertise.
> > > Second strategy is to use state migration [2]. Alternatively, you
> > can
> > > manually convert state with state processor API [3].
> > > Last option is to do a full reprocessing of data. This can be
> > done on
> > > a non-production cluster and then a savepoint can be used to
> > > bootstrap the production cluster quickly. This option needs to be
> > > available anyways for the case that you find any logic error. But
> > of
> > > course, this option has the highest implications (may need to
> > purge
> > > sink beforehand).
> > >
> > > [1]
> > >
> > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > [2]
> > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > [3]
> > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > >
> > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > [hidden email]> wrote:
> > > > Hi, we are starting to use Beam with Flink as runner on our
> > > > applications, and recently we would like to get advantages that
> > > > Flink
> > > > checkpoiting provides, but it seems we are not understanding it
> > > > clearly.
> > > >
> > > > Simplifying, our application does the following:
> > > >   - Read meesages from a couple of Kafka topics
> > > >   - Combine them
> > > >   - Write combination result to a sink (Exasol DB)
> > > >
> > > > As application is processing messages using event time, and one
> > of
> > > > the
> > > > topics is almost idle, the first time application is started
> > > > messages
> > > > are stuck in the combiner because watermark don't advance until
> > we
> > > > have
> > > > messages arriving onto idled topic (we know this and is not a
> > > > problem
> > > > for us though).
> > > >
> > > > The problem is that we've observed, if a checkpoint is
> > triggered
> > > > when
> > > > messages are still stuck in the combiner, surprisingly for us,
> > the
> > > > checkpoint finishes successfully (and offsets committed to
> > Kafka)
> > > > even
> > > > messages haven't progressed to the sink yet. Is this expected?
> > > >
> > > > The thing is that, if in the future, we make not state
> > compatible
> > > > changes in application source code, checkpoint taken couldn't
> > be
> > > > restored. So we would like to start the application without
> > using
> > > > any
> > > > checkpoint but without losing data.
> > > > Problem here would be that data loss would happen because
> > messages
> > > > stuck in combiner are already committed to Kafka and
> > application
> > > > would
> > > > start to read from latest commited offset in Kafka if we don't
> > use
> > > > any
> > > > checkpoint, thus those messages are not going to be read from
> > the
> > > > source again.
> > > >
> > > > So, I guess our question is how are you doing in order to not
> > lose
> > > > data
> > > > when developing applications, because sooner or later you are
> > going
> > > > to
> > > > add breaking changes...
> > > >
> > > > For example, we've seen those two errors so far:
> > > >   - After changing an operator name:
> > > >
> > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > cluster
> > > > entrypoint.
> > > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed
> > to
> > > > take
> > > > leadership with session id 00000000-0000-0000-0000-
> > 000000000000.
> > > > ...
> > > > Caused by:
> > org.apache.flink.runtime.client.JobExecutionException:
> > > > Could
> > > > not set up JobManager
> > > >     at
> > > >
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag
> > > > erRu
> > > > nner.java:152)
> > > >     at
> > > >
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.
> > > > crea
> > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > >     at
> > > >
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana
> > > > gerR
> > > > unner$5(Dispatcher.java:375)
> > > >     at
> > > >
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C
> > > > heck
> > > > edSupplier.java:34)
> > > >     ... 7 more
> > > > Caused by: java.lang.IllegalStateException: Failed to rollback
> > to
> > > > checkpoint/savepoint
> > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > bcc1f65a0986.
> > > > Cannot map checkpoint/savepoint state for operator
> > > > f476451c6210bd2783f36fa331b9da5e to the new program, because
> > the
> > > > operator is not available in the new program. If you want to
> > allow
> > > > to
> > > > skip this, you can set the --allowNonRestoredState option on
> > the
> > > > CLI.
> > > >     at
> > > >
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec
> > > > kpoi
> > > > nt(Checkpoints.java:205)
> > > > ...
> > > >
> > > >   - After modifying a Java model class involved in a combine:
> > > > org.apache.flink.runtime.state.BackendBuildingException: Failed
> > > > when
> > > > trying to restore heap backend
> > > >     at
> > > >
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu
> > > > ild(
> > > > HeapKeyedStateBackendBuilder.java:116)
> > > >     at
> > > >
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye
> > > > dSta
> > > > teBackend(FsStateBackend.java:529)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > > Impl
> > > >
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29
> > > > 1)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a
> > > > ttem
> > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c
> > > > reat
> > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > > Impl
> > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer
> > > > Impl
> > > >
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135
> > > > )
> > > >     at
> > > >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini
> > > > tial
> > > > izeState(AbstractStreamOperator.java:253)
> > > >     at
> > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState
> > > > (Str
> > > > eamTask.java:881)
> > > >     at
> > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa
> > > > sk.j
> > > > ava:395)
> > > >     at
> > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > > >     at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > >     at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.io.InvalidClassException:
> > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata;
> > > > local
> > > > class incompatible: stream classdesc serialVersionUID =
> > > > 8366890161513008789, local class serialVersionUID =
> > > > 174312384610985998
> > > >     at
> > > >
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> > > >
> > > >
> > > > Apologies in advance as we are new to Flink, so may be we are
> > > > missing
> > > > something obvious here.
> > > >
> > > > Thanks
> > > >
> > > >
> > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > confidencial. A no ser que usted sea el destinatario, no puede
> > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > información contenida en el mensaje. Si no es el destinatario,
> > debe
> > > > borrar este correo y notificar al remitente inmediatamente.
> > > > Cualquier punto de vista u opinión expresada en este correo
> > > > electrónico son únicamente del remitente, a no ser que se
> > indique
> > > > lo contrario. Todos los derechos de autor en cualquier material
> > de
> > > > este correo son reservados. Todos los correos electrónicos,
> > > > salientes o entrantes, pueden ser grabados y monitorizados para
> > uso
> > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > responsabilidad ante cualquier perdida o daño que surja o
> > resulte
> > > > de la recepción, uso o transmisión de este correo electrónico
> > hasta
> > > > el máximo permitido por la ley.
> > > >
> > > > This email and any attachment to it are confidential. Unless
> > you
> > > > are the intended recipient, you may not use, copy or disclose
> > > > either the message or any information contained in the message.
> > If
> > > > you are not the intended recipient, you should delete this
> > email
> > > > and notify the sender immediately. Any views or opinions
> > expressed
> > > > in this email are those of the sender only, unless otherwise
> > > > stated. All copyright in any of the material in this email is
> > > > reserved. All emails, incoming and outgoing, may be recorded
> > and
> > > > monitored for legitimate business purposes. We exclude all
> > > > liability for any loss or damage arising or resulting from the
> > > > receipt, use or transmission of this email to the fullest
> > extent
> > > > permitted by law.
> > >
> > >
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Ivan San Jose
In reply to this post by Arvid Heise-3
Actually I'm also thinking about how Beam coders are related with
runner's serialization... I mean, on Beam you specify a coder per each
Java type in order to Beam be able to serialize/deserialize that type,
but then, is the runner used under the hood serializing/deserializing
again the result, so that is doing a double serialization, does it make
sense? Or how does it work?

On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote:

> Yep, sorry if I'm bothering you but I think I'm still not getting
> this,
> how could I tell Beam to tell Flink to use that serializer instead of
> Java standard one, because I think Beam is abstracting us from Flink
> checkpointing mechanism, so I'm afraid that if we use Flink API
> directly we might break other things that Beam is hidding for us...
>
> On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > Hi Ivan,
> >
> > The easiest way is to use some implementation that's already there
> > [1]. I already mentioned Avro and would strongly recommend giving
> > it
> > a go. If you make sure to provide a default value for as many
> > fields
> > as possible, you can always remove them later giving you great
> > flexibility. I can give you more hints if you decide to go this
> > route.
> >
> > If you want to have a custom implementation, I'd start at looking
> > of
> > one of the simpler implementations like MapSerializerSnapshot [2].
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > (see known implementing classes).
> > [2]
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> >
> > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > [hidden email]> wrote:
> > > Thanks for your complete answer Arvid, we will try to approach
> > > all
> > > things you mentioned, but take into account we are using Beam on
> > > top of
> > > Flink, so, to be honest, I don't know how could we implement the
> > > custom
> > > serialization thing (
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > ) there. Could you please give us some hints? Thanks
> > >
> > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > First let's address the issue with idle partitions. The
> > > > solution
> > > is
> > > > to use a watermark assigner that also emits a watermark with
> > > > some
> > > > idle timeout [1].
> > > >
> > > > Now the second question, on why Kafka commits are committed for
> > > in-
> > > > flight, checkpointed data. The basic idea is that you are not
> > > losing
> > > > data while avoiding replicated output.
> > > > So if you commit offsets only after data has been fully
> > > processed,
> > > > upon crash the same data point would be reprocessed jointly
> > > > with
> > > the
> > > > restored in-flight data, so you get duplicate messages in your
> > > > system.
> > > > To avoid duplicates data needs to be more or less completely
> > > flushed
> > > > out the system before a checkpoint is performed. That would
> > > produce a
> > > > huge downtime.
> > > > Instead, we assume that we can always resume from the
> > > checkpoints.
> > > > Which leads to the last question on what to do when your
> > > > pipeline
> > > has
> > > > breaking changes.
> > > > First strategy is to avoid breaking changes as much as
> > > > possible.
> > > > State could for example also be stored as Avro to allow schema
> > > > evolution. Minor things like renamed operators will not happen
> > > with a
> > > > bit more expertise.
> > > > Second strategy is to use state migration [2]. Alternatively,
> > > > you
> > > can
> > > > manually convert state with state processor API [3].
> > > > Last option is to do a full reprocessing of data. This can be
> > > done on
> > > > a non-production cluster and then a savepoint can be used to
> > > > bootstrap the production cluster quickly. This option needs to
> > > > be
> > > > available anyways for the case that you find any logic error.
> > > > But
> > > of
> > > > course, this option has the highest implications (may need to
> > > purge
> > > > sink beforehand).
> > > >
> > > > [1]
> > > >
> > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > > [2]
> > > >
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > [3]
> > > >
> > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > > [hidden email]> wrote:
> > > > > Hi, we are starting to use Beam with Flink as runner on our
> > > > > applications, and recently we would like to get advantages
> > > > > that
> > > > > Flink
> > > > > checkpoiting provides, but it seems we are not understanding
> > > > > it
> > > > > clearly.
> > > > >
> > > > > Simplifying, our application does the following:
> > > > >   - Read meesages from a couple of Kafka topics
> > > > >   - Combine them
> > > > >   - Write combination result to a sink (Exasol DB)
> > > > >
> > > > > As application is processing messages using event time, and
> > > > > one
> > > of
> > > > > the
> > > > > topics is almost idle, the first time application is started
> > > > > messages
> > > > > are stuck in the combiner because watermark don't advance
> > > > > until
> > > we
> > > > > have
> > > > > messages arriving onto idled topic (we know this and is not a
> > > > > problem
> > > > > for us though).
> > > > >
> > > > > The problem is that we've observed, if a checkpoint is
> > > triggered
> > > > > when
> > > > > messages are still stuck in the combiner, surprisingly for
> > > > > us,
> > > the
> > > > > checkpoint finishes successfully (and offsets committed to
> > > Kafka)
> > > > > even
> > > > > messages haven't progressed to the sink yet. Is this
> > > > > expected?
> > > > >
> > > > > The thing is that, if in the future, we make not state
> > > compatible
> > > > > changes in application source code, checkpoint taken couldn't
> > > be
> > > > > restored. So we would like to start the application without
> > > using
> > > > > any
> > > > > checkpoint but without losing data.
> > > > > Problem here would be that data loss would happen because
> > > messages
> > > > > stuck in combiner are already committed to Kafka and
> > > application
> > > > > would
> > > > > start to read from latest commited offset in Kafka if we
> > > > > don't
> > > use
> > > > > any
> > > > > checkpoint, thus those messages are not going to be read from
> > > the
> > > > > source again.
> > > > >
> > > > > So, I guess our question is how are you doing in order to not
> > > lose
> > > > > data
> > > > > when developing applications, because sooner or later you are
> > > going
> > > > > to
> > > > > add breaking changes...
> > > > >
> > > > > For example, we've seen those two errors so far:
> > > > >   - After changing an operator name:
> > > > >
> > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > > cluster
> > > > > entrypoint.
> > > > > org.apache.flink.runtime.dispatcher.DispatcherException:
> > > > > Failed
> > > to
> > > > > take
> > > > > leadership with session id 00000000-0000-0000-0000-
> > > 000000000000.
> > > > > ...
> > > > > Caused by:
> > > org.apache.flink.runtime.client.JobExecutionException:
> > > > > Could
> > > > > not set up JobManager
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan
> > > ag
> > > > > erRu
> > > > > nner.java:152)
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor
> > > y.
> > > > > crea
> > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa
> > > na
> > > > > gerR
> > > > > unner$5(Dispatcher.java:375)
> > > > >     at
> > > > >
> > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0
> > > (C
> > > > > heck
> > > > > edSupplier.java:34)
> > > > >     ... 7 more
> > > > > Caused by: java.lang.IllegalStateException: Failed to
> > > > > rollback
> > > to
> > > > > checkpoint/savepoint
> > > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > > bcc1f65a0986.
> > > > > Cannot map checkpoint/savepoint state for operator
> > > > > f476451c6210bd2783f36fa331b9da5e to the new program, because
> > > the
> > > > > operator is not available in the new program. If you want to
> > > allow
> > > > > to
> > > > > skip this, you can set the --allowNonRestoredState option on
> > > the
> > > > > CLI.
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh
> > > ec
> > > > > kpoi
> > > > > nt(Checkpoints.java:205)
> > > > > ...
> > > > >
> > > > >   - After modifying a Java model class involved in a combine:
> > > > > org.apache.flink.runtime.state.BackendBuildingException:
> > > > > Failed
> > > > > when
> > > > > trying to restore heap backend
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.
> > > bu
> > > > > ild(
> > > > > HeapKeyedStateBackendBuilder.java:116)
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe
> > > ye
> > > > > dSta
> > > > > teBackend(FsStateBackend.java:529)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > er
> > > > > Impl
> > > > >
> > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:
> > > 29
> > > > > 1)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > .a
> > > > > ttem
> > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > .c
> > > > > reat
> > > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > er
> > > > > Impl
> > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > er
> > > > > Impl
> > > > >
> > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1
> > > 35
> > > > > )
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i
> > > ni
> > > > > tial
> > > > > izeState(AbstractStreamOperator.java:253)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta
> > > te
> > > > > (Str
> > > > > eamTask.java:881)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream
> > > Ta
> > > > > sk.j
> > > > > ava:395)
> > > > >     at
> > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705
> > > > > )
> > > > >     at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > >     at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.io.InvalidClassException:
> > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata
> > > > > ;
> > > > > local
> > > > > class incompatible: stream classdesc serialVersionUID =
> > > > > 8366890161513008789, local class serialVersionUID =
> > > > > 174312384610985998
> > > > >     at
> > > > >
> > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699
> > > )
> > > > >
> > > > > Apologies in advance as we are new to Flink, so may be we are
> > > > > missing
> > > > > something obvious here.
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > confidencial. A no ser que usted sea el destinatario, no
> > > > > puede
> > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > > información contenida en el mensaje. Si no es el
> > > > > destinatario,
> > > debe
> > > > > borrar este correo y notificar al remitente inmediatamente.
> > > > > Cualquier punto de vista u opinión expresada en este correo
> > > > > electrónico son únicamente del remitente, a no ser que se
> > > indique
> > > > > lo contrario. Todos los derechos de autor en cualquier
> > > > > material
> > > de
> > > > > este correo son reservados. Todos los correos electrónicos,
> > > > > salientes o entrantes, pueden ser grabados y monitorizados
> > > > > para
> > > uso
> > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > responsabilidad ante cualquier perdida o daño que surja o
> > > resulte
> > > > > de la recepción, uso o transmisión de este correo electrónico
> > > hasta
> > > > > el máximo permitido por la ley.
> > > > >
> > > > > This email and any attachment to it are confidential. Unless
> > > you
> > > > > are the intended recipient, you may not use, copy or disclose
> > > > > either the message or any information contained in the
> > > > > message.
> > > If
> > > > > you are not the intended recipient, you should delete this
> > > email
> > > > > and notify the sender immediately. Any views or opinions
> > > expressed
> > > > > in this email are those of the sender only, unless otherwise
> > > > > stated. All copyright in any of the material in this email is
> > > > > reserved. All emails, incoming and outgoing, may be recorded
> > > and
> > > > > monitored for legitimate business purposes. We exclude all
> > > > > liability for any loss or damage arising or resulting from
> > > > > the
> > > > > receipt, use or transmission of this email to the fullest
> > > extent
> > > > > permitted by law.
> > >
> > > Este correo electrónico y sus adjuntos son de naturaleza
> > > confidencial. A no ser que usted sea el destinatario, no puede
> > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > información contenida en el mensaje. Si no es el destinatario,
> > > debe
> > > borrar este correo y notificar al remitente inmediatamente.
> > > Cualquier punto de vista u opinión expresada en este correo
> > > electrónico son únicamente del remitente, a no ser que se indique
> > > lo contrario. Todos los derechos de autor en cualquier material
> > > de
> > > este correo son reservados. Todos los correos electrónicos,
> > > salientes o entrantes, pueden ser grabados y monitorizados para
> > > uso
> > > legítimo del negocio. Nos encontramos exentos de toda
> > > responsabilidad ante cualquier perdida o daño que surja o resulte
> > > de la recepción, uso o transmisión de este correo electrónico
> > > hasta
> > > el máximo permitido por la ley.
> > >
> > > This email and any attachment to it are confidential. Unless you
> > > are the intended recipient, you may not use, copy or disclose
> > > either the message or any information contained in the message.
> > > If
> > > you are not the intended recipient, you should delete this email
> > > and notify the sender immediately. Any views or opinions
> > > expressed
> > > in this email are those of the sender only, unless otherwise
> > > stated. All copyright in any of the material in this email is
> > > reserved. All emails, incoming and outgoing, may be recorded and
> > > monitored for legitimate business purposes. We exclude all
> > > liability for any loss or damage arising or resulting from the
> > > receipt, use or transmission of this email to the fullest extent
> > > permitted by law.
>
> Este correo electrónico y sus adjuntos son de naturaleza
> confidencial. A no ser que usted sea el destinatario, no puede
> utilizar, copiar o desvelar tanto el mensaje como cualquier
> información contenida en el mensaje. Si no es el destinatario, debe
> borrar este correo y notificar al remitente inmediatamente. Cualquier
> punto de vista u opinión expresada en este correo electrónico son
> únicamente del remitente, a no ser que se indique lo contrario. Todos
> los derechos de autor en cualquier material de este correo son
> reservados. Todos los correos electrónicos, salientes o entrantes,
> pueden ser grabados y monitorizados para uso legítimo del negocio.
> Nos encontramos exentos de toda responsabilidad ante cualquier
> perdida o daño que surja o resulte de la recepción, uso o transmisión
> de este correo electrónico hasta el máximo permitido por la ley.
>
> This email and any attachment to it are confidential. Unless you are
> the intended recipient, you may not use, copy or disclose either the
> message or any information contained in the message. If you are not
> the intended recipient, you should delete this email and notify the
> sender immediately. Any views or opinions expressed in this email are
> those of the sender only, unless otherwise stated. All copyright in
> any of the material in this email is reserved. All emails, incoming
> and outgoing, may be recorded and monitored for legitimate business
> purposes. We exclude all liability for any loss or damage arising or
> resulting from the receipt, use or transmission of this email to the
> fullest extent permitted by law.


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Arvid Heise-3
In reply to this post by Arvid Heise-3
Hi Ivan,

I'm fearing that only a few mailing list users have actually deeper Beam experience. I only used it briefly 3 years ago. Most users here are using Flink directly to avoid these kinds of double-abstraction issues.

It might be better to switch to the Beam mailing list if you have Beam-specific questions including how the Flink runner actually translates the Beam program to Flink.

On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <[hidden email]> wrote:
Actually I'm also thinking about how Beam coders are related with
runner's serialization... I mean, on Beam you specify a coder per each
Java type in order to Beam be able to serialize/deserialize that type,
but then, is the runner used under the hood serializing/deserializing
again the result, so that is doing a double serialization, does it make
sense? Or how does it work?

On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote:
> Yep, sorry if I'm bothering you but I think I'm still not getting
> this,
> how could I tell Beam to tell Flink to use that serializer instead of
> Java standard one, because I think Beam is abstracting us from Flink
> checkpointing mechanism, so I'm afraid that if we use Flink API
> directly we might break other things that Beam is hidding for us...
>
> On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > Hi Ivan,
> >
> > The easiest way is to use some implementation that's already there
> > [1]. I already mentioned Avro and would strongly recommend giving
> > it
> > a go. If you make sure to provide a default value for as many
> > fields
> > as possible, you can always remove them later giving you great
> > flexibility. I can give you more hints if you decide to go this
> > route.
> >
> > If you want to have a custom implementation, I'd start at looking
> > of
> > one of the simpler implementations like MapSerializerSnapshot [2].
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > (see known implementing classes).
> > [2]
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> >
> > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > [hidden email]> wrote:
> > > Thanks for your complete answer Arvid, we will try to approach
> > > all
> > > things you mentioned, but take into account we are using Beam on
> > > top of
> > > Flink, so, to be honest, I don't know how could we implement the
> > > custom
> > > serialization thing (
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > ) there. Could you please give us some hints? Thanks
> > >
> > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > First let's address the issue with idle partitions. The
> > > > solution
> > > is
> > > > to use a watermark assigner that also emits a watermark with
> > > > some
> > > > idle timeout [1].
> > > >
> > > > Now the second question, on why Kafka commits are committed for
> > > in-
> > > > flight, checkpointed data. The basic idea is that you are not
> > > losing
> > > > data while avoiding replicated output.
> > > > So if you commit offsets only after data has been fully
> > > processed,
> > > > upon crash the same data point would be reprocessed jointly
> > > > with
> > > the
> > > > restored in-flight data, so you get duplicate messages in your
> > > > system.
> > > > To avoid duplicates data needs to be more or less completely
> > > flushed
> > > > out the system before a checkpoint is performed. That would
> > > produce a
> > > > huge downtime.
> > > > Instead, we assume that we can always resume from the
> > > checkpoints.
> > > > Which leads to the last question on what to do when your
> > > > pipeline
> > > has
> > > > breaking changes.
> > > > First strategy is to avoid breaking changes as much as
> > > > possible.
> > > > State could for example also be stored as Avro to allow schema
> > > > evolution. Minor things like renamed operators will not happen
> > > with a
> > > > bit more expertise.
> > > > Second strategy is to use state migration [2]. Alternatively,
> > > > you
> > > can
> > > > manually convert state with state processor API [3].
> > > > Last option is to do a full reprocessing of data. This can be
> > > done on
> > > > a non-production cluster and then a savepoint can be used to
> > > > bootstrap the production cluster quickly. This option needs to
> > > > be
> > > > available anyways for the case that you find any logic error.
> > > > But
> > > of
> > > > course, this option has the highest implications (may need to
> > > purge
> > > > sink beforehand).
> > > >
> > > > [1]
> > > >
> > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > > [2]
> > > >
> > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > [3]
> > > >
> > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > > [hidden email]> wrote:
> > > > > Hi, we are starting to use Beam with Flink as runner on our
> > > > > applications, and recently we would like to get advantages
> > > > > that
> > > > > Flink
> > > > > checkpoiting provides, but it seems we are not understanding
> > > > > it
> > > > > clearly.
> > > > >
> > > > > Simplifying, our application does the following:
> > > > >   - Read meesages from a couple of Kafka topics
> > > > >   - Combine them
> > > > >   - Write combination result to a sink (Exasol DB)
> > > > >
> > > > > As application is processing messages using event time, and
> > > > > one
> > > of
> > > > > the
> > > > > topics is almost idle, the first time application is started
> > > > > messages
> > > > > are stuck in the combiner because watermark don't advance
> > > > > until
> > > we
> > > > > have
> > > > > messages arriving onto idled topic (we know this and is not a
> > > > > problem
> > > > > for us though).
> > > > >
> > > > > The problem is that we've observed, if a checkpoint is
> > > triggered
> > > > > when
> > > > > messages are still stuck in the combiner, surprisingly for
> > > > > us,
> > > the
> > > > > checkpoint finishes successfully (and offsets committed to
> > > Kafka)
> > > > > even
> > > > > messages haven't progressed to the sink yet. Is this
> > > > > expected?
> > > > >
> > > > > The thing is that, if in the future, we make not state
> > > compatible
> > > > > changes in application source code, checkpoint taken couldn't
> > > be
> > > > > restored. So we would like to start the application without
> > > using
> > > > > any
> > > > > checkpoint but without losing data.
> > > > > Problem here would be that data loss would happen because
> > > messages
> > > > > stuck in combiner are already committed to Kafka and
> > > application
> > > > > would
> > > > > start to read from latest commited offset in Kafka if we
> > > > > don't
> > > use
> > > > > any
> > > > > checkpoint, thus those messages are not going to be read from
> > > the
> > > > > source again.
> > > > >
> > > > > So, I guess our question is how are you doing in order to not
> > > lose
> > > > > data
> > > > > when developing applications, because sooner or later you are
> > > going
> > > > > to
> > > > > add breaking changes...
> > > > >
> > > > > For example, we've seen those two errors so far:
> > > > >   - After changing an operator name:
> > > > >
> > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > > cluster
> > > > > entrypoint.
> > > > > org.apache.flink.runtime.dispatcher.DispatcherException:
> > > > > Failed
> > > to
> > > > > take
> > > > > leadership with session id 00000000-0000-0000-0000-
> > > 000000000000.
> > > > > ...
> > > > > Caused by:
> > > org.apache.flink.runtime.client.JobExecutionException:
> > > > > Could
> > > > > not set up JobManager
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan
> > > ag
> > > > > erRu
> > > > > nner.java:152)
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor
> > > y.
> > > > > crea
> > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa
> > > na
> > > > > gerR
> > > > > unner$5(Dispatcher.java:375)
> > > > >     at
> > > > >
> > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0
> > > (C
> > > > > heck
> > > > > edSupplier.java:34)
> > > > >     ... 7 more
> > > > > Caused by: java.lang.IllegalStateException: Failed to
> > > > > rollback
> > > to
> > > > > checkpoint/savepoint
> > > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > > bcc1f65a0986.
> > > > > Cannot map checkpoint/savepoint state for operator
> > > > > f476451c6210bd2783f36fa331b9da5e to the new program, because
> > > the
> > > > > operator is not available in the new program. If you want to
> > > allow
> > > > > to
> > > > > skip this, you can set the --allowNonRestoredState option on
> > > the
> > > > > CLI.
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh
> > > ec
> > > > > kpoi
> > > > > nt(Checkpoints.java:205)
> > > > > ...
> > > > >
> > > > >   - After modifying a Java model class involved in a combine:
> > > > > org.apache.flink.runtime.state.BackendBuildingException:
> > > > > Failed
> > > > > when
> > > > > trying to restore heap backend
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.
> > > bu
> > > > > ild(
> > > > > HeapKeyedStateBackendBuilder.java:116)
> > > > >     at
> > > > >
> > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe
> > > ye
> > > > > dSta
> > > > > teBackend(FsStateBackend.java:529)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > er
> > > > > Impl
> > > > >
> > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:
> > > 29
> > > > > 1)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > .a
> > > > > ttem
> > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > .c
> > > > > reat
> > > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > er
> > > > > Impl
> > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > er
> > > > > Impl
> > > > >
> > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1
> > > 35
> > > > > )
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i
> > > ni
> > > > > tial
> > > > > izeState(AbstractStreamOperator.java:253)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta
> > > te
> > > > > (Str
> > > > > eamTask.java:881)
> > > > >     at
> > > > >
> > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream
> > > Ta
> > > > > sk.j
> > > > > ava:395)
> > > > >     at
> > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705
> > > > > )
> > > > >     at
> > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > >     at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.io.InvalidClassException:
> > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata
> > > > > ;
> > > > > local
> > > > > class incompatible: stream classdesc serialVersionUID =
> > > > > 8366890161513008789, local class serialVersionUID =
> > > > > 174312384610985998
> > > > >     at
> > > > >
> > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699
> > > )
> > > > >
> > > > > Apologies in advance as we are new to Flink, so may be we are
> > > > > missing
> > > > > something obvious here.
> > > > >
> > > > > Thanks
> > > > >
> > > > >
> > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > confidencial. A no ser que usted sea el destinatario, no
> > > > > puede
> > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > > información contenida en el mensaje. Si no es el
> > > > > destinatario,
> > > debe
> > > > > borrar este correo y notificar al remitente inmediatamente.
> > > > > Cualquier punto de vista u opinión expresada en este correo
> > > > > electrónico son únicamente del remitente, a no ser que se
> > > indique
> > > > > lo contrario. Todos los derechos de autor en cualquier
> > > > > material
> > > de
> > > > > este correo son reservados. Todos los correos electrónicos,
> > > > > salientes o entrantes, pueden ser grabados y monitorizados
> > > > > para
> > > uso
> > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > responsabilidad ante cualquier perdida o daño que surja o
> > > resulte
> > > > > de la recepción, uso o transmisión de este correo electrónico
> > > hasta
> > > > > el máximo permitido por la ley.
> > > > >
> > > > > This email and any attachment to it are confidential. Unless
> > > you
> > > > > are the intended recipient, you may not use, copy or disclose
> > > > > either the message or any information contained in the
> > > > > message.
> > > If
> > > > > you are not the intended recipient, you should delete this
> > > email
> > > > > and notify the sender immediately. Any views or opinions
> > > expressed
> > > > > in this email are those of the sender only, unless otherwise
> > > > > stated. All copyright in any of the material in this email is
> > > > > reserved. All emails, incoming and outgoing, may be recorded
> > > and
> > > > > monitored for legitimate business purposes. We exclude all
> > > > > liability for any loss or damage arising or resulting from
> > > > > the
> > > > > receipt, use or transmission of this email to the fullest
> > > extent
> > > > > permitted by law.
> > >
> > > Este correo electrónico y sus adjuntos son de naturaleza
> > > confidencial. A no ser que usted sea el destinatario, no puede
> > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > información contenida en el mensaje. Si no es el destinatario,
> > > debe
> > > borrar este correo y notificar al remitente inmediatamente.
> > > Cualquier punto de vista u opinión expresada en este correo
> > > electrónico son únicamente del remitente, a no ser que se indique
> > > lo contrario. Todos los derechos de autor en cualquier material
> > > de
> > > este correo son reservados. Todos los correos electrónicos,
> > > salientes o entrantes, pueden ser grabados y monitorizados para
> > > uso
> > > legítimo del negocio. Nos encontramos exentos de toda
> > > responsabilidad ante cualquier perdida o daño que surja o resulte
> > > de la recepción, uso o transmisión de este correo electrónico
> > > hasta
> > > el máximo permitido por la ley.
> > >
> > > This email and any attachment to it are confidential. Unless you
> > > are the intended recipient, you may not use, copy or disclose
> > > either the message or any information contained in the message.
> > > If
> > > you are not the intended recipient, you should delete this email
> > > and notify the sender immediately. Any views or opinions
> > > expressed
> > > in this email are those of the sender only, unless otherwise
> > > stated. All copyright in any of the material in this email is
> > > reserved. All emails, incoming and outgoing, may be recorded and
> > > monitored for legitimate business purposes. We exclude all
> > > liability for any loss or damage arising or resulting from the
> > > receipt, use or transmission of this email to the fullest extent
> > > permitted by law.
>
> Este correo electrónico y sus adjuntos son de naturaleza
> confidencial. A no ser que usted sea el destinatario, no puede
> utilizar, copiar o desvelar tanto el mensaje como cualquier
> información contenida en el mensaje. Si no es el destinatario, debe
> borrar este correo y notificar al remitente inmediatamente. Cualquier
> punto de vista u opinión expresada en este correo electrónico son
> únicamente del remitente, a no ser que se indique lo contrario. Todos
> los derechos de autor en cualquier material de este correo son
> reservados. Todos los correos electrónicos, salientes o entrantes,
> pueden ser grabados y monitorizados para uso legítimo del negocio.
> Nos encontramos exentos de toda responsabilidad ante cualquier
> perdida o daño que surja o resulte de la recepción, uso o transmisión
> de este correo electrónico hasta el máximo permitido por la ley.
>
> This email and any attachment to it are confidential. Unless you are
> the intended recipient, you may not use, copy or disclose either the
> message or any information contained in the message. If you are not
> the intended recipient, you should delete this email and notify the
> sender immediately. Any views or opinions expressed in this email are
> those of the sender only, unless otherwise stated. All copyright in
> any of the material in this email is reserved. All emails, incoming
> and outgoing, may be recorded and monitored for legitimate business
> purposes. We exclude all liability for any loss or damage arising or
> resulting from the receipt, use or transmission of this email to the
> fullest extent permitted by law.


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Ivan San Jose
Perfect, thank you so much Arvid, I'd expect more people using Beam on
top of Flink, but it seems is not so popular.

On Tue, 2020-05-19 at 12:46 +0200, Arvid Heise wrote:

> Hi Ivan,
>
> I'm fearing that only a few mailing list users have actually deeper
> Beam experience. I only used it briefly 3 years ago. Most users here
> are using Flink directly to avoid these kinds of double-abstraction
> issues.
>
> It might be better to switch to the Beam mailing list if you have
> Beam-specific questions including how the Flink runner actually
> translates the Beam program to Flink.
>
> On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <
> [hidden email]> wrote:
> > Actually I'm also thinking about how Beam coders are related with
> > runner's serialization... I mean, on Beam you specify a coder per
> > each
> > Java type in order to Beam be able to serialize/deserialize that
> > type,
> > but then, is the runner used under the hood
> > serializing/deserializing
> > again the result, so that is doing a double serialization, does it
> > make
> > sense? Or how does it work?
> >
> > On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote:
> > > Yep, sorry if I'm bothering you but I think I'm still not getting
> > > this,
> > > how could I tell Beam to tell Flink to use that serializer
> > instead of
> > > Java standard one, because I think Beam is abstracting us from
> > Flink
> > > checkpointing mechanism, so I'm afraid that if we use Flink API
> > > directly we might break other things that Beam is hidding for
> > us...
> > >
> > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > The easiest way is to use some implementation that's already
> > there
> > > > [1]. I already mentioned Avro and would strongly recommend
> > giving
> > > > it
> > > > a go. If you make sure to provide a default value for as many
> > > > fields
> > > > as possible, you can always remove them later giving you great
> > > > flexibility. I can give you more hints if you decide to go this
> > > > route.
> > > >
> > > > If you want to have a custom implementation, I'd start at
> > looking
> > > > of
> > > > one of the simpler implementations like MapSerializerSnapshot
> > [2].
> > > >
> > > > [1]
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > > > (see known implementing classes).
> > > > [2]
> > > >
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> > > >
> > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > > > [hidden email]> wrote:
> > > > > Thanks for your complete answer Arvid, we will try to
> > approach
> > > > > all
> > > > > things you mentioned, but take into account we are using Beam
> > on
> > > > > top of
> > > > > Flink, so, to be honest, I don't know how could we implement
> > the
> > > > > custom
> > > > > serialization thing (
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > ) there. Could you please give us some hints? Thanks
> > > > >
> > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > > > Hi Ivan,
> > > > > >
> > > > > > First let's address the issue with idle partitions. The
> > > > > > solution
> > > > > is
> > > > > > to use a watermark assigner that also emits a watermark
> > with
> > > > > > some
> > > > > > idle timeout [1].
> > > > > >
> > > > > > Now the second question, on why Kafka commits are committed
> > for
> > > > > in-
> > > > > > flight, checkpointed data. The basic idea is that you are
> > not
> > > > > losing
> > > > > > data while avoiding replicated output.
> > > > > > So if you commit offsets only after data has been fully
> > > > > processed,
> > > > > > upon crash the same data point would be reprocessed jointly
> > > > > > with
> > > > > the
> > > > > > restored in-flight data, so you get duplicate messages in
> > your
> > > > > > system.
> > > > > > To avoid duplicates data needs to be more or less
> > completely
> > > > > flushed
> > > > > > out the system before a checkpoint is performed. That would
> > > > > produce a
> > > > > > huge downtime.
> > > > > > Instead, we assume that we can always resume from the
> > > > > checkpoints.
> > > > > > Which leads to the last question on what to do when your
> > > > > > pipeline
> > > > > has
> > > > > > breaking changes.
> > > > > > First strategy is to avoid breaking changes as much as
> > > > > > possible.
> > > > > > State could for example also be stored as Avro to allow
> > schema
> > > > > > evolution. Minor things like renamed operators will not
> > happen
> > > > > with a
> > > > > > bit more expertise.
> > > > > > Second strategy is to use state migration [2].
> > Alternatively,
> > > > > > you
> > > > > can
> > > > > > manually convert state with state processor API [3].
> > > > > > Last option is to do a full reprocessing of data. This can
> > be
> > > > > done on
> > > > > > a non-production cluster and then a savepoint can be used
> > to
> > > > > > bootstrap the production cluster quickly. This option needs
> > to
> > > > > > be
> > > > > > available anyways for the case that you find any logic
> > error.
> > > > > > But
> > > > > of
> > > > > > course, this option has the highest implications (may need
> > to
> > > > > purge
> > > > > > sink beforehand).
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > > > > [2]
> > > > > >
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > > [3]
> > > > > >
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > > > > [hidden email]> wrote:
> > > > > > > Hi, we are starting to use Beam with Flink as runner on
> > our
> > > > > > > applications, and recently we would like to get
> > advantages
> > > > > > > that
> > > > > > > Flink
> > > > > > > checkpoiting provides, but it seems we are not
> > understanding
> > > > > > > it
> > > > > > > clearly.
> > > > > > >
> > > > > > > Simplifying, our application does the following:
> > > > > > >   - Read meesages from a couple of Kafka topics
> > > > > > >   - Combine them
> > > > > > >   - Write combination result to a sink (Exasol DB)
> > > > > > >
> > > > > > > As application is processing messages using event time,
> > and
> > > > > > > one
> > > > > of
> > > > > > > the
> > > > > > > topics is almost idle, the first time application is
> > started
> > > > > > > messages
> > > > > > > are stuck in the combiner because watermark don't advance
> > > > > > > until
> > > > > we
> > > > > > > have
> > > > > > > messages arriving onto idled topic (we know this and is
> > not a
> > > > > > > problem
> > > > > > > for us though).
> > > > > > >
> > > > > > > The problem is that we've observed, if a checkpoint is
> > > > > triggered
> > > > > > > when
> > > > > > > messages are still stuck in the combiner, surprisingly
> > for
> > > > > > > us,
> > > > > the
> > > > > > > checkpoint finishes successfully (and offsets committed
> > to
> > > > > Kafka)
> > > > > > > even
> > > > > > > messages haven't progressed to the sink yet. Is this
> > > > > > > expected?
> > > > > > >
> > > > > > > The thing is that, if in the future, we make not state
> > > > > compatible
> > > > > > > changes in application source code, checkpoint taken
> > couldn't
> > > > > be
> > > > > > > restored. So we would like to start the application
> > without
> > > > > using
> > > > > > > any
> > > > > > > checkpoint but without losing data.
> > > > > > > Problem here would be that data loss would happen because
> > > > > messages
> > > > > > > stuck in combiner are already committed to Kafka and
> > > > > application
> > > > > > > would
> > > > > > > start to read from latest commited offset in Kafka if we
> > > > > > > don't
> > > > > use
> > > > > > > any
> > > > > > > checkpoint, thus those messages are not going to be read
> > from
> > > > > the
> > > > > > > source again.
> > > > > > >
> > > > > > > So, I guess our question is how are you doing in order to
> > not
> > > > > lose
> > > > > > > data
> > > > > > > when developing applications, because sooner or later you
> > are
> > > > > going
> > > > > > > to
> > > > > > > add breaking changes...
> > > > > > >
> > > > > > > For example, we've seen those two errors so far:
> > > > > > >   - After changing an operator name:
> > > > > > >
> > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > > > > cluster
> > > > > > > entrypoint.
> > > > > > > org.apache.flink.runtime.dispatcher.DispatcherException:
> > > > > > > Failed
> > > > > to
> > > > > > > take
> > > > > > > leadership with session id 00000000-0000-0000-0000-
> > > > > 000000000000.
> > > > > > > ...
> > > > > > > Caused by:
> > > > > org.apache.flink.runtime.client.JobExecutionException:
> > > > > > > Could
> > > > > > > not set up JobManager
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan
> > > > > ag
> > > > > > > erRu
> > > > > > > nner.java:152)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor
> > > > > y.
> > > > > > > crea
> > > > > > >
> > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa
> > > > > na
> > > > > > > gerR
> > > > > > > unner$5(Dispatcher.java:375)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0
> > > > > (C
> > > > > > > heck
> > > > > > > edSupplier.java:34)
> > > > > > >     ... 7 more
> > > > > > > Caused by: java.lang.IllegalStateException: Failed to
> > > > > > > rollback
> > > > > to
> > > > > > > checkpoint/savepoint
> > > > > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > > > > bcc1f65a0986.
> > > > > > > Cannot map checkpoint/savepoint state for operator
> > > > > > > f476451c6210bd2783f36fa331b9da5e to the new program,
> > because
> > > > > the
> > > > > > > operator is not available in the new program. If you want
> > to
> > > > > allow
> > > > > > > to
> > > > > > > skip this, you can set the --allowNonRestoredState option
> > on
> > > > > the
> > > > > > > CLI.
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh
> > > > > ec
> > > > > > > kpoi
> > > > > > > nt(Checkpoints.java:205)
> > > > > > > ...
> > > > > > >
> > > > > > >   - After modifying a Java model class involved in a
> > combine:
> > > > > > > org.apache.flink.runtime.state.BackendBuildingException:
> > > > > > > Failed
> > > > > > > when
> > > > > > > trying to restore heap backend
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.
> > > > > bu
> > > > > > > ild(
> > > > > > > HeapKeyedStateBackendBuilder.java:116)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe
> > > > > ye
> > > > > > > dSta
> > > > > > > teBackend(FsStateBackend.java:529)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > > > >
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:
> > > > > 29
> > > > > > > 1)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > > .a
> > > > > > > ttem
> > > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > > .c
> > > > > > > reat
> > > > > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > > > >
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1
> > > > > 35
> > > > > > > )
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i
> > > > > ni
> > > > > > > tial
> > > > > > > izeState(AbstractStreamOperator.java:253)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta
> > > > > te
> > > > > > > (Str
> > > > > > > eamTask.java:881)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream
> > > > > Ta
> > > > > > > sk.j
> > > > > > > ava:395)
> > > > > > >     at
> > > > > > >
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705
> > > > > > > )
> > > > > > >     at
> > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > > >     at java.lang.Thread.run(Thread.java:748)
> > > > > > > Caused by: java.io.InvalidClassException:
> > > > > > >
> > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata
> > > > > > > ;
> > > > > > > local
> > > > > > > class incompatible: stream classdesc serialVersionUID =
> > > > > > > 8366890161513008789, local class serialVersionUID =
> > > > > > > 174312384610985998
> > > > > > >     at
> > > > > > >
> > > > >
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699
> > > > > )
> > > > > > >
> > > > > > > Apologies in advance as we are new to Flink, so may be we
> > are
> > > > > > > missing
> > > > > > > something obvious here.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > >
> > > > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > > > confidencial. A no ser que usted sea el destinatario, no
> > > > > > > puede
> > > > > > > utilizar, copiar o desvelar tanto el mensaje como
> > cualquier
> > > > > > > información contenida en el mensaje. Si no es el
> > > > > > > destinatario,
> > > > > debe
> > > > > > > borrar este correo y notificar al remitente
> > inmediatamente.
> > > > > > > Cualquier punto de vista u opinión expresada en este
> > correo
> > > > > > > electrónico son únicamente del remitente, a no ser que se
> > > > > indique
> > > > > > > lo contrario. Todos los derechos de autor en cualquier
> > > > > > > material
> > > > > de
> > > > > > > este correo son reservados. Todos los correos
> > electrónicos,
> > > > > > > salientes o entrantes, pueden ser grabados y
> > monitorizados
> > > > > > > para
> > > > > uso
> > > > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > > > responsabilidad ante cualquier perdida o daño que surja o
> > > > > resulte
> > > > > > > de la recepción, uso o transmisión de este correo
> > electrónico
> > > > > hasta
> > > > > > > el máximo permitido por la ley.
> > > > > > >
> > > > > > > This email and any attachment to it are confidential.
> > Unless
> > > > > you
> > > > > > > are the intended recipient, you may not use, copy or
> > disclose
> > > > > > > either the message or any information contained in the
> > > > > > > message.
> > > > > If
> > > > > > > you are not the intended recipient, you should delete
> > this
> > > > > email
> > > > > > > and notify the sender immediately. Any views or opinions
> > > > > expressed
> > > > > > > in this email are those of the sender only, unless
> > otherwise
> > > > > > > stated. All copyright in any of the material in this
> > email is
> > > > > > > reserved. All emails, incoming and outgoing, may be
> > recorded
> > > > > and
> > > > > > > monitored for legitimate business purposes. We exclude
> > all
> > > > > > > liability for any loss or damage arising or resulting
> > from
> > > > > > > the
> > > > > > > receipt, use or transmission of this email to the fullest
> > > > > extent
> > > > > > > permitted by law.
> > > > >
> > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > confidencial. A no ser que usted sea el destinatario, no
> > puede
> > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > > información contenida en el mensaje. Si no es el
> > destinatario,
> > > > > debe
> > > > > borrar este correo y notificar al remitente inmediatamente.
> > > > > Cualquier punto de vista u opinión expresada en este correo
> > > > > electrónico son únicamente del remitente, a no ser que se
> > indique
> > > > > lo contrario. Todos los derechos de autor en cualquier
> > material
> > > > > de
> > > > > este correo son reservados. Todos los correos electrónicos,
> > > > > salientes o entrantes, pueden ser grabados y monitorizados
> > para
> > > > > uso
> > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > responsabilidad ante cualquier perdida o daño que surja o
> > resulte
> > > > > de la recepción, uso o transmisión de este correo electrónico
> > > > > hasta
> > > > > el máximo permitido por la ley.
> > > > >
> > > > > This email and any attachment to it are confidential. Unless
> > you
> > > > > are the intended recipient, you may not use, copy or disclose
> > > > > either the message or any information contained in the
> > message.
> > > > > If
> > > > > you are not the intended recipient, you should delete this
> > email
> > > > > and notify the sender immediately. Any views or opinions
> > > > > expressed
> > > > > in this email are those of the sender only, unless otherwise
> > > > > stated. All copyright in any of the material in this email is
> > > > > reserved. All emails, incoming and outgoing, may be recorded
> > and
> > > > > monitored for legitimate business purposes. We exclude all
> > > > > liability for any loss or damage arising or resulting from
> > the
> > > > > receipt, use or transmission of this email to the fullest
> > extent
> > > > > permitted by law.
> > >
> > > Este correo electrónico y sus adjuntos son de naturaleza
> > > confidencial. A no ser que usted sea el destinatario, no puede
> > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > información contenida en el mensaje. Si no es el destinatario,
> > debe
> > > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier
> > > punto de vista u opinión expresada en este correo electrónico son
> > > únicamente del remitente, a no ser que se indique lo contrario.
> > Todos
> > > los derechos de autor en cualquier material de este correo son
> > > reservados. Todos los correos electrónicos, salientes o
> > entrantes,
> > > pueden ser grabados y monitorizados para uso legítimo del
> > negocio.
> > > Nos encontramos exentos de toda responsabilidad ante cualquier
> > > perdida o daño que surja o resulte de la recepción, uso o
> > transmisión
> > > de este correo electrónico hasta el máximo permitido por la ley.
> > >
> > > This email and any attachment to it are confidential. Unless you
> > are
> > > the intended recipient, you may not use, copy or disclose either
> > the
> > > message or any information contained in the message. If you are
> > not
> > > the intended recipient, you should delete this email and notify
> > the
> > > sender immediately. Any views or opinions expressed in this email
> > are
> > > those of the sender only, unless otherwise stated. All copyright
> > in
> > > any of the material in this email is reserved. All emails,
> > incoming
> > > and outgoing, may be recorded and monitored for legitimate
> > business
> > > purposes. We exclude all liability for any loss or damage arising
> > or
> > > resulting from the receipt, use or transmission of this email to
> > the
> > > fullest extent permitted by law.
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.
Reply | Threaded
Open this post in threaded view
|

Re: Developing Beam applications using Flink checkpoints

Eleanore Jin
In reply to this post by Arvid Heise-3
Hi Ivan, 

Beam coders are wrapped in Flink's TypeSerializers. So I don't think it will result in double serialization. 

Thanks!
Eleanore

On Tue, May 19, 2020 at 4:04 AM Ivan San Jose <[hidden email]> wrote:
Perfect, thank you so much Arvid, I'd expect more people using Beam on
top of Flink, but it seems is not so popular.

On Tue, 2020-05-19 at 12:46 +0200, Arvid Heise wrote:
> Hi Ivan,
>
> I'm fearing that only a few mailing list users have actually deeper
> Beam experience. I only used it briefly 3 years ago. Most users here
> are using Flink directly to avoid these kinds of double-abstraction
> issues.
>
> It might be better to switch to the Beam mailing list if you have
> Beam-specific questions including how the Flink runner actually
> translates the Beam program to Flink.
>
> On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <
> [hidden email]> wrote:
> > Actually I'm also thinking about how Beam coders are related with
> > runner's serialization... I mean, on Beam you specify a coder per
> > each
> > Java type in order to Beam be able to serialize/deserialize that
> > type,
> > but then, is the runner used under the hood
> > serializing/deserializing
> > again the result, so that is doing a double serialization, does it
> > make
> > sense? Or how does it work?
> >
> > On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote:
> > > Yep, sorry if I'm bothering you but I think I'm still not getting
> > > this,
> > > how could I tell Beam to tell Flink to use that serializer
> > instead of
> > > Java standard one, because I think Beam is abstracting us from
> > Flink
> > > checkpointing mechanism, so I'm afraid that if we use Flink API
> > > directly we might break other things that Beam is hidding for
> > us...
> > >
> > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote:
> > > > Hi Ivan,
> > > >
> > > > The easiest way is to use some implementation that's already
> > there
> > > > [1]. I already mentioned Avro and would strongly recommend
> > giving
> > > > it
> > > > a go. If you make sure to provide a default value for as many
> > > > fields
> > > > as possible, you can always remove them later giving you great
> > > > flexibility. I can give you more hints if you decide to go this
> > > > route.
> > > >
> > > > If you want to have a custom implementation, I'd start at
> > looking
> > > > of
> > > > one of the simpler implementations like MapSerializerSnapshot
> > [2].
> > > >
> > > > [1]
> > > >
> > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html
> > > > (see known implementing classes).
> > > > [2]
> > > >
> > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java
> > > >
> > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <
> > > > [hidden email]> wrote:
> > > > > Thanks for your complete answer Arvid, we will try to
> > approach
> > > > > all
> > > > > things you mentioned, but take into account we are using Beam
> > on
> > > > > top of
> > > > > Flink, so, to be honest, I don't know how could we implement
> > the
> > > > > custom
> > > > > serialization thing (
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > ) there. Could you please give us some hints? Thanks
> > > > >
> > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote:
> > > > > > Hi Ivan,
> > > > > >
> > > > > > First let's address the issue with idle partitions. The
> > > > > > solution
> > > > > is
> > > > > > to use a watermark assigner that also emits a watermark
> > with
> > > > > > some
> > > > > > idle timeout [1].
> > > > > >
> > > > > > Now the second question, on why Kafka commits are committed
> > for
> > > > > in-
> > > > > > flight, checkpointed data. The basic idea is that you are
> > not
> > > > > losing
> > > > > > data while avoiding replicated output.
> > > > > > So if you commit offsets only after data has been fully
> > > > > processed,
> > > > > > upon crash the same data point would be reprocessed jointly
> > > > > > with
> > > > > the
> > > > > > restored in-flight data, so you get duplicate messages in
> > your
> > > > > > system.
> > > > > > To avoid duplicates data needs to be more or less
> > completely
> > > > > flushed
> > > > > > out the system before a checkpoint is performed. That would
> > > > > produce a
> > > > > > huge downtime.
> > > > > > Instead, we assume that we can always resume from the
> > > > > checkpoints.
> > > > > > Which leads to the last question on what to do when your
> > > > > > pipeline
> > > > > has
> > > > > > breaking changes.
> > > > > > First strategy is to avoid breaking changes as much as
> > > > > > possible.
> > > > > > State could for example also be stored as Avro to allow
> > schema
> > > > > > evolution. Minor things like renamed operators will not
> > happen
> > > > > with a
> > > > > > bit more expertise.
> > > > > > Second strategy is to use state migration [2].
> > Alternatively,
> > > > > > you
> > > > > can
> > > > > > manually convert state with state processor API [3].
> > > > > > Last option is to do a full reprocessing of data. This can
> > be
> > > > > done on
> > > > > > a non-production cluster and then a savepoint can be used
> > to
> > > > > > bootstrap the production cluster quickly. This option needs
> > to
> > > > > > be
> > > > > > available anyways for the case that you find any logic
> > error.
> > > > > > But
> > > > > of
> > > > > > course, this option has the highest implications (may need
> > to
> > > > > purge
> > > > > > sink beforehand).
> > > > > >
> > > > > > [1]
> > > > > >
> > > > >
> > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187
> > > > > > [2]
> > > > > >
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction
> > > > > > [3]
> > > > > >
> > > > >
> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
> > > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <
> > > > > > [hidden email]> wrote:
> > > > > > > Hi, we are starting to use Beam with Flink as runner on
> > our
> > > > > > > applications, and recently we would like to get
> > advantages
> > > > > > > that
> > > > > > > Flink
> > > > > > > checkpoiting provides, but it seems we are not
> > understanding
> > > > > > > it
> > > > > > > clearly.
> > > > > > >
> > > > > > > Simplifying, our application does the following:
> > > > > > >   - Read meesages from a couple of Kafka topics
> > > > > > >   - Combine them
> > > > > > >   - Write combination result to a sink (Exasol DB)
> > > > > > >
> > > > > > > As application is processing messages using event time,
> > and
> > > > > > > one
> > > > > of
> > > > > > > the
> > > > > > > topics is almost idle, the first time application is
> > started
> > > > > > > messages
> > > > > > > are stuck in the combiner because watermark don't advance
> > > > > > > until
> > > > > we
> > > > > > > have
> > > > > > > messages arriving onto idled topic (we know this and is
> > not a
> > > > > > > problem
> > > > > > > for us though).
> > > > > > >
> > > > > > > The problem is that we've observed, if a checkpoint is
> > > > > triggered
> > > > > > > when
> > > > > > > messages are still stuck in the combiner, surprisingly
> > for
> > > > > > > us,
> > > > > the
> > > > > > > checkpoint finishes successfully (and offsets committed
> > to
> > > > > Kafka)
> > > > > > > even
> > > > > > > messages haven't progressed to the sink yet. Is this
> > > > > > > expected?
> > > > > > >
> > > > > > > The thing is that, if in the future, we make not state
> > > > > compatible
> > > > > > > changes in application source code, checkpoint taken
> > couldn't
> > > > > be
> > > > > > > restored. So we would like to start the application
> > without
> > > > > using
> > > > > > > any
> > > > > > > checkpoint but without losing data.
> > > > > > > Problem here would be that data loss would happen because
> > > > > messages
> > > > > > > stuck in combiner are already committed to Kafka and
> > > > > application
> > > > > > > would
> > > > > > > start to read from latest commited offset in Kafka if we
> > > > > > > don't
> > > > > use
> > > > > > > any
> > > > > > > checkpoint, thus those messages are not going to be read
> > from
> > > > > the
> > > > > > > source again.
> > > > > > >
> > > > > > > So, I guess our question is how are you doing in order to
> > not
> > > > > lose
> > > > > > > data
> > > > > > > when developing applications, because sooner or later you
> > are
> > > > > going
> > > > > > > to
> > > > > > > add breaking changes...
> > > > > > >
> > > > > > > For example, we've seen those two errors so far:
> > > > > > >   - After changing an operator name:
> > > > > > >
> > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the
> > > > > cluster
> > > > > > > entrypoint.
> > > > > > > org.apache.flink.runtime.dispatcher.DispatcherException:
> > > > > > > Failed
> > > > > to
> > > > > > > take
> > > > > > > leadership with session id 00000000-0000-0000-0000-
> > > > > 000000000000.
> > > > > > > ...
> > > > > > > Caused by:
> > > > > org.apache.flink.runtime.client.JobExecutionException:
> > > > > > > Could
> > > > > > > not set up JobManager
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan
> > > > > ag
> > > > > > > erRu
> > > > > > > nner.java:152)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor
> > > > > y.
> > > > > > > crea
> > > > > > >
> > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa
> > > > > na
> > > > > > > gerR
> > > > > > > unner$5(Dispatcher.java:375)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0
> > > > > (C
> > > > > > > heck
> > > > > > > edSupplier.java:34)
> > > > > > >     ... 7 more
> > > > > > > Caused by: java.lang.IllegalStateException: Failed to
> > > > > > > rollback
> > > > > to
> > > > > > > checkpoint/savepoint
> > > > > hdfs://RTDWLTDEV/data/lake/processing/flink-
> > > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28-
> > > > > bcc1f65a0986.
> > > > > > > Cannot map checkpoint/savepoint state for operator
> > > > > > > f476451c6210bd2783f36fa331b9da5e to the new program,
> > because
> > > > > the
> > > > > > > operator is not available in the new program. If you want
> > to
> > > > > allow
> > > > > > > to
> > > > > > > skip this, you can set the --allowNonRestoredState option
> > on
> > > > > the
> > > > > > > CLI.
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh
> > > > > ec
> > > > > > > kpoi
> > > > > > > nt(Checkpoints.java:205)
> > > > > > > ...
> > > > > > >
> > > > > > >   - After modifying a Java model class involved in a
> > combine:
> > > > > > > org.apache.flink.runtime.state.BackendBuildingException:
> > > > > > > Failed
> > > > > > > when
> > > > > > > trying to restore heap backend
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.
> > > > > bu
> > > > > > > ild(
> > > > > > > HeapKeyedStateBackendBuilder.java:116)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe
> > > > > ye
> > > > > > > dSta
> > > > > > > teBackend(FsStateBackend.java:529)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > > > >
> > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:
> > > > > 29
> > > > > > > 1)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > > .a
> > > > > > > ttem
> > > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> > > > > .c
> > > > > > > reat
> > > > > > > eAndRestore(BackendRestorerProcedure.java:121)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ
> > > > > er
> > > > > > > Impl
> > > > > > >
> > > > >
> > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1
> > > > > 35
> > > > > > > )
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i
> > > > > ni
> > > > > > > tial
> > > > > > > izeState(AbstractStreamOperator.java:253)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta
> > > > > te
> > > > > > > (Str
> > > > > > > eamTask.java:881)
> > > > > > >     at
> > > > > > >
> > > > >
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream
> > > > > Ta
> > > > > > > sk.j
> > > > > > > ava:395)
> > > > > > >     at
> > > > > > >
> > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705
> > > > > > > )
> > > > > > >     at
> > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > > > > > >     at java.lang.Thread.run(Thread.java:748)
> > > > > > > Caused by: java.io.InvalidClassException:
> > > > > > >
> > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata
> > > > > > > ;
> > > > > > > local
> > > > > > > class incompatible: stream classdesc serialVersionUID =
> > > > > > > 8366890161513008789, local class serialVersionUID =
> > > > > > > 174312384610985998
> > > > > > >     at
> > > > > > >
> > > > >
> > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699
> > > > > )
> > > > > > >
> > > > > > > Apologies in advance as we are new to Flink, so may be we
> > are
> > > > > > > missing
> > > > > > > something obvious here.
> > > > > > >
> > > > > > > Thanks
> > > > > > >
> > > > > > >
> > > > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > > > confidencial. A no ser que usted sea el destinatario, no
> > > > > > > puede
> > > > > > > utilizar, copiar o desvelar tanto el mensaje como
> > cualquier
> > > > > > > información contenida en el mensaje. Si no es el
> > > > > > > destinatario,
> > > > > debe
> > > > > > > borrar este correo y notificar al remitente
> > inmediatamente.
> > > > > > > Cualquier punto de vista u opinión expresada en este
> > correo
> > > > > > > electrónico son únicamente del remitente, a no ser que se
> > > > > indique
> > > > > > > lo contrario. Todos los derechos de autor en cualquier
> > > > > > > material
> > > > > de
> > > > > > > este correo son reservados. Todos los correos
> > electrónicos,
> > > > > > > salientes o entrantes, pueden ser grabados y
> > monitorizados
> > > > > > > para
> > > > > uso
> > > > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > > > responsabilidad ante cualquier perdida o daño que surja o
> > > > > resulte
> > > > > > > de la recepción, uso o transmisión de este correo
> > electrónico
> > > > > hasta
> > > > > > > el máximo permitido por la ley.
> > > > > > >
> > > > > > > This email and any attachment to it are confidential.
> > Unless
> > > > > you
> > > > > > > are the intended recipient, you may not use, copy or
> > disclose
> > > > > > > either the message or any information contained in the
> > > > > > > message.
> > > > > If
> > > > > > > you are not the intended recipient, you should delete
> > this
> > > > > email
> > > > > > > and notify the sender immediately. Any views or opinions
> > > > > expressed
> > > > > > > in this email are those of the sender only, unless
> > otherwise
> > > > > > > stated. All copyright in any of the material in this
> > email is
> > > > > > > reserved. All emails, incoming and outgoing, may be
> > recorded
> > > > > and
> > > > > > > monitored for legitimate business purposes. We exclude
> > all
> > > > > > > liability for any loss or damage arising or resulting
> > from
> > > > > > > the
> > > > > > > receipt, use or transmission of this email to the fullest
> > > > > extent
> > > > > > > permitted by law.
> > > > >
> > > > > Este correo electrónico y sus adjuntos son de naturaleza
> > > > > confidencial. A no ser que usted sea el destinatario, no
> > puede
> > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > > > información contenida en el mensaje. Si no es el
> > destinatario,
> > > > > debe
> > > > > borrar este correo y notificar al remitente inmediatamente.
> > > > > Cualquier punto de vista u opinión expresada en este correo
> > > > > electrónico son únicamente del remitente, a no ser que se
> > indique
> > > > > lo contrario. Todos los derechos de autor en cualquier
> > material
> > > > > de
> > > > > este correo son reservados. Todos los correos electrónicos,
> > > > > salientes o entrantes, pueden ser grabados y monitorizados
> > para
> > > > > uso
> > > > > legítimo del negocio. Nos encontramos exentos de toda
> > > > > responsabilidad ante cualquier perdida o daño que surja o
> > resulte
> > > > > de la recepción, uso o transmisión de este correo electrónico
> > > > > hasta
> > > > > el máximo permitido por la ley.
> > > > >
> > > > > This email and any attachment to it are confidential. Unless
> > you
> > > > > are the intended recipient, you may not use, copy or disclose
> > > > > either the message or any information contained in the
> > message.
> > > > > If
> > > > > you are not the intended recipient, you should delete this
> > email
> > > > > and notify the sender immediately. Any views or opinions
> > > > > expressed
> > > > > in this email are those of the sender only, unless otherwise
> > > > > stated. All copyright in any of the material in this email is
> > > > > reserved. All emails, incoming and outgoing, may be recorded
> > and
> > > > > monitored for legitimate business purposes. We exclude all
> > > > > liability for any loss or damage arising or resulting from
> > the
> > > > > receipt, use or transmission of this email to the fullest
> > extent
> > > > > permitted by law.
> > >
> > > Este correo electrónico y sus adjuntos son de naturaleza
> > > confidencial. A no ser que usted sea el destinatario, no puede
> > > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > > información contenida en el mensaje. Si no es el destinatario,
> > debe
> > > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier
> > > punto de vista u opinión expresada en este correo electrónico son
> > > únicamente del remitente, a no ser que se indique lo contrario.
> > Todos
> > > los derechos de autor en cualquier material de este correo son
> > > reservados. Todos los correos electrónicos, salientes o
> > entrantes,
> > > pueden ser grabados y monitorizados para uso legítimo del
> > negocio.
> > > Nos encontramos exentos de toda responsabilidad ante cualquier
> > > perdida o daño que surja o resulte de la recepción, uso o
> > transmisión
> > > de este correo electrónico hasta el máximo permitido por la ley.
> > >
> > > This email and any attachment to it are confidential. Unless you
> > are
> > > the intended recipient, you may not use, copy or disclose either
> > the
> > > message or any information contained in the message. If you are
> > not
> > > the intended recipient, you should delete this email and notify
> > the
> > > sender immediately. Any views or opinions expressed in this email
> > are
> > > those of the sender only, unless otherwise stated. All copyright
> > in
> > > any of the material in this email is reserved. All emails,
> > incoming
> > > and outgoing, may be recorded and monitored for legitimate
> > business
> > > purposes. We exclude all liability for any loss or damage arising
> > or
> > > resulting from the receipt, use or transmission of this email to
> > the
> > > fullest extent permitted by law.
> >
> >
> > Este correo electrónico y sus adjuntos son de naturaleza
> > confidencial. A no ser que usted sea el destinatario, no puede
> > utilizar, copiar o desvelar tanto el mensaje como cualquier
> > información contenida en el mensaje. Si no es el destinatario, debe
> > borrar este correo y notificar al remitente inmediatamente.
> > Cualquier punto de vista u opinión expresada en este correo
> > electrónico son únicamente del remitente, a no ser que se indique
> > lo contrario. Todos los derechos de autor en cualquier material de
> > este correo son reservados. Todos los correos electrónicos,
> > salientes o entrantes, pueden ser grabados y monitorizados para uso
> > legítimo del negocio. Nos encontramos exentos de toda
> > responsabilidad ante cualquier perdida o daño que surja o resulte
> > de la recepción, uso o transmisión de este correo electrónico hasta
> > el máximo permitido por la ley.
> >
> > This email and any attachment to it are confidential. Unless you
> > are the intended recipient, you may not use, copy or disclose
> > either the message or any information contained in the message. If
> > you are not the intended recipient, you should delete this email
> > and notify the sender immediately. Any views or opinions expressed
> > in this email are those of the sender only, unless otherwise
> > stated. All copyright in any of the material in this email is
> > reserved. All emails, incoming and outgoing, may be recorded and
> > monitored for legitimate business purposes. We exclude all
> > liability for any loss or damage arising or resulting from the
> > receipt, use or transmission of this email to the fullest extent
> > permitted by law.
>
>


Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley.

This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law.