Hi, we are starting to use Beam with Flink as runner on our
applications, and recently we would like to get advantages that Flink checkpoiting provides, but it seems we are not understanding it clearly. Simplifying, our application does the following: - Read meesages from a couple of Kafka topics - Combine them - Write combination result to a sink (Exasol DB) As application is processing messages using event time, and one of the topics is almost idle, the first time application is started messages are stuck in the combiner because watermark don't advance until we have messages arriving onto idled topic (we know this and is not a problem for us though). The problem is that we've observed, if a checkpoint is triggered when messages are still stuck in the combiner, surprisingly for us, the checkpoint finishes successfully (and offsets committed to Kafka) even messages haven't progressed to the sink yet. Is this expected? The thing is that, if in the future, we make not state compatible changes in application source code, checkpoint taken couldn't be restored. So we would like to start the application without using any checkpoint but without losing data. Problem here would be that data loss would happen because messages stuck in combiner are already committed to Kafka and application would start to read from latest commited offset in Kafka if we don't use any checkpoint, thus those messages are not going to be read from the source again. So, I guess our question is how are you doing in order to not lose data when developing applications, because sooner or later you are going to add breaking changes... For example, we've seen those two errors so far: - After changing an operator name: 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster entrypoint. org.apache.flink.runtime.dispatcher.DispatcherException: Failed to take leadership with session id 00000000-0000-0000-0000-000000000000. ... Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRu nner.java:152) at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.crea teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerR unner$5(Dispatcher.java:375) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(Check edSupplier.java:34) ... 7 more Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink- savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986. Cannot map checkpoint/savepoint state for operator f476451c6210bd2783f36fa331b9da5e to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoi nt(Checkpoints.java:205) ... - After modifying a Java model class involved in a combine: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build( HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeyedSta teBackend(FsStateBackend.java:529) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attem ptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.creat eAndRestore(BackendRestorerProcedure.java:121) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initial izeState(AbstractStreamOperator.java:253) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(Str eamTask.java:881) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.j ava:395) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.InvalidClassException: internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; local class incompatible: stream classdesc serialVersionUID = 8366890161513008789, local class serialVersionUID = 174312384610985998 at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) Apologies in advance as we are new to Flink, so may be we are missing something obvious here. Thanks Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law. |
Hi Ivan, First let's address the issue with idle partitions. The solution is to use a watermark assigner that also emits a watermark with some idle timeout [1]. Now the second question, on why Kafka commits are committed for in-flight, checkpointed data. The basic idea is that you are not losing data while avoiding replicated output. So if you commit offsets only after data has been fully processed, upon crash the same data point would be reprocessed jointly with the restored in-flight data, so you get duplicate messages in your system. To avoid duplicates data needs to be more or less completely flushed out the system before a checkpoint is performed. That would produce a huge downtime. Instead, we assume that we can always resume from the checkpoints. Which leads to the last question on what to do when your pipeline has breaking changes. First strategy is to avoid breaking changes as much as possible. State could for example also be stored as Avro to allow schema evolution. Minor things like renamed operators will not happen with a bit more expertise. Second strategy is to use state migration [2]. Alternatively, you can manually convert state with state processor API [3]. Last option is to do a full reprocessing of data. This can be done on a non-production cluster and then a savepoint can be used to bootstrap the production cluster quickly. This option needs to be available anyways for the case that you find any logic error. But of course, this option has the highest implications (may need to purge sink beforehand). On Fri, May 15, 2020 at 2:58 PM Ivan San Jose <[hidden email]> wrote: Hi, we are starting to use Beam with Flink as runner on our -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks for your complete answer Arvid, we will try to approach all
things you mentioned, but take into account we are using Beam on top of Flink, so, to be honest, I don't know how could we implement the custom serialization thing ( https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction ) there. Could you please give us some hints? Thanks On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote: > Hi Ivan, > > First let's address the issue with idle partitions. The solution is > to use a watermark assigner that also emits a watermark with some > idle timeout [1]. > > Now the second question, on why Kafka commits are committed for in- > flight, checkpointed data. The basic idea is that you are not losing > data while avoiding replicated output. > So if you commit offsets only after data has been fully processed, > upon crash the same data point would be reprocessed jointly with the > restored in-flight data, so you get duplicate messages in your > system. > To avoid duplicates data needs to be more or less completely flushed > out the system before a checkpoint is performed. That would produce a > huge downtime. > Instead, we assume that we can always resume from the checkpoints. > > Which leads to the last question on what to do when your pipeline has > breaking changes. > First strategy is to avoid breaking changes as much as possible. > State could for example also be stored as Avro to allow schema > evolution. Minor things like renamed operators will not happen with a > bit more expertise. > Second strategy is to use state migration [2]. Alternatively, you can > manually convert state with state processor API [3]. > Last option is to do a full reprocessing of data. This can be done on > a non-production cluster and then a savepoint can be used to > bootstrap the production cluster quickly. This option needs to be > available anyways for the case that you find any logic error. But of > course, this option has the highest implications (may need to purge > sink beforehand). > > [1] > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187 > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > [3] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose < > [hidden email]> wrote: > > Hi, we are starting to use Beam with Flink as runner on our > > applications, and recently we would like to get advantages that > > Flink > > checkpoiting provides, but it seems we are not understanding it > > clearly. > > > > Simplifying, our application does the following: > > - Read meesages from a couple of Kafka topics > > - Combine them > > - Write combination result to a sink (Exasol DB) > > > > As application is processing messages using event time, and one of > > the > > topics is almost idle, the first time application is started > > messages > > are stuck in the combiner because watermark don't advance until we > > have > > messages arriving onto idled topic (we know this and is not a > > problem > > for us though). > > > > The problem is that we've observed, if a checkpoint is triggered > > when > > messages are still stuck in the combiner, surprisingly for us, the > > checkpoint finishes successfully (and offsets committed to Kafka) > > even > > messages haven't progressed to the sink yet. Is this expected? > > > > The thing is that, if in the future, we make not state compatible > > changes in application source code, checkpoint taken couldn't be > > restored. So we would like to start the application without using > > any > > checkpoint but without losing data. > > Problem here would be that data loss would happen because messages > > stuck in combiner are already committed to Kafka and application > > would > > start to read from latest commited offset in Kafka if we don't use > > any > > checkpoint, thus those messages are not going to be read from the > > source again. > > > > So, I guess our question is how are you doing in order to not lose > > data > > when developing applications, because sooner or later you are going > > to > > add breaking changes... > > > > For example, we've seen those two errors so far: > > - After changing an operator name: > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the cluster > > entrypoint. > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed to > > take > > leadership with session id 00000000-0000-0000-0000-000000000000. > > ... > > Caused by: org.apache.flink.runtime.client.JobExecutionException: > > Could > > not set up JobManager > > at > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag > > erRu > > nner.java:152) > > at > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory. > > crea > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > > at > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana > > gerR > > unner$5(Dispatcher.java:375) > > at > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C > > heck > > edSupplier.java:34) > > ... 7 more > > Caused by: java.lang.IllegalStateException: Failed to rollback to > > checkpoint/savepoint hdfs://RTDWLTDEV/data/lake/processing/flink- > > savepoints/holly-reconciliation-fact/savepoint-90ab28-bcc1f65a0986. > > Cannot map checkpoint/savepoint state for operator > > f476451c6210bd2783f36fa331b9da5e to the new program, because the > > operator is not available in the new program. If you want to allow > > to > > skip this, you can set the --allowNonRestoredState option on the > > CLI. > > at > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec > > kpoi > > nt(Checkpoints.java:205) > > ... > > > > - After modifying a Java model class involved in a combine: > > org.apache.flink.runtime.state.BackendBuildingException: Failed > > when > > trying to restore heap backend > > at > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu > > ild( > > HeapKeyedStateBackendBuilder.java:116) > > at > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye > > dSta > > teBackend(FsStateBackend.java:529) > > at > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > Impl > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29 > > 1) > > at > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a > > ttem > > ptCreateAndRestore(BackendRestorerProcedure.java:142) > > at > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c > > reat > > eAndRestore(BackendRestorerProcedure.java:121) > > at > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > Impl > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > at > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > Impl > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135 > > ) > > at > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini > > tial > > izeState(AbstractStreamOperator.java:253) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState > > (Str > > eamTask.java:881) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa > > sk.j > > ava:395) > > at > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.io.InvalidClassException: > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; > > local > > class incompatible: stream classdesc serialVersionUID = > > 8366890161513008789, local class serialVersionUID = > > 174312384610985998 > > at > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > > > > > > Apologies in advance as we are new to Flink, so may be we are > > missing > > something obvious here. > > > > Thanks > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > confidencial. A no ser que usted sea el destinatario, no puede > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > información contenida en el mensaje. Si no es el destinatario, debe > > borrar este correo y notificar al remitente inmediatamente. > > Cualquier punto de vista u opinión expresada en este correo > > electrónico son únicamente del remitente, a no ser que se indique > > lo contrario. Todos los derechos de autor en cualquier material de > > este correo son reservados. Todos los correos electrónicos, > > salientes o entrantes, pueden ser grabados y monitorizados para uso > > legítimo del negocio. Nos encontramos exentos de toda > > responsabilidad ante cualquier perdida o daño que surja o resulte > > de la recepción, uso o transmisión de este correo electrónico hasta > > el máximo permitido por la ley. > > > > This email and any attachment to it are confidential. Unless you > > are the intended recipient, you may not use, copy or disclose > > either the message or any information contained in the message. If > > you are not the intended recipient, you should delete this email > > and notify the sender immediately. Any views or opinions expressed > > in this email are those of the sender only, unless otherwise > > stated. All copyright in any of the material in this email is > > reserved. All emails, incoming and outgoing, may be recorded and > > monitored for legitimate business purposes. We exclude all > > liability for any loss or damage arising or resulting from the > > receipt, use or transmission of this email to the fullest extent > > permitted by law. > > Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law. |
In reply to this post by Arvid Heise-3
Hi Ivan, The easiest way is to use some implementation that's already there [1]. I already mentioned Avro and would strongly recommend giving it a go. If you make sure to provide a default value for as many fields as possible, you can always remove them later giving you great flexibility. I can give you more hints if you decide to go this route. If you want to have a custom implementation, I'd start at looking of one of the simpler implementations like MapSerializerSnapshot [2]. [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html (see known implementing classes). On Tue, May 19, 2020 at 10:31 AM Ivan San Jose <[hidden email]> wrote: Thanks for your complete answer Arvid, we will try to approach all -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Yep, sorry if I'm bothering you but I think I'm still not getting this,
how could I tell Beam to tell Flink to use that serializer instead of Java standard one, because I think Beam is abstracting us from Flink checkpointing mechanism, so I'm afraid that if we use Flink API directly we might break other things that Beam is hidding for us... On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote: > Hi Ivan, > > The easiest way is to use some implementation that's already there > [1]. I already mentioned Avro and would strongly recommend giving it > a go. If you make sure to provide a default value for as many fields > as possible, you can always remove them later giving you great > flexibility. I can give you more hints if you decide to go this > route. > > If you want to have a custom implementation, I'd start at looking of > one of the simpler implementations like MapSerializerSnapshot [2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html > (see known implementing classes). > [2] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose < > [hidden email]> wrote: > > Thanks for your complete answer Arvid, we will try to approach all > > things you mentioned, but take into account we are using Beam on > > top of > > Flink, so, to be honest, I don't know how could we implement the > > custom > > serialization thing ( > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > ) there. Could you please give us some hints? Thanks > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote: > > > Hi Ivan, > > > > > > First let's address the issue with idle partitions. The solution > > is > > > to use a watermark assigner that also emits a watermark with some > > > idle timeout [1]. > > > > > > Now the second question, on why Kafka commits are committed for > > in- > > > flight, checkpointed data. The basic idea is that you are not > > losing > > > data while avoiding replicated output. > > > So if you commit offsets only after data has been fully > > processed, > > > upon crash the same data point would be reprocessed jointly with > > the > > > restored in-flight data, so you get duplicate messages in your > > > system. > > > To avoid duplicates data needs to be more or less completely > > flushed > > > out the system before a checkpoint is performed. That would > > produce a > > > huge downtime. > > > Instead, we assume that we can always resume from the > > checkpoints. > > > > > > Which leads to the last question on what to do when your pipeline > > has > > > breaking changes. > > > First strategy is to avoid breaking changes as much as possible. > > > State could for example also be stored as Avro to allow schema > > > evolution. Minor things like renamed operators will not happen > > with a > > > bit more expertise. > > > Second strategy is to use state migration [2]. Alternatively, you > > can > > > manually convert state with state processor API [3]. > > > Last option is to do a full reprocessing of data. This can be > > done on > > > a non-production cluster and then a savepoint can be used to > > > bootstrap the production cluster quickly. This option needs to be > > > available anyways for the case that you find any logic error. But > > of > > > course, this option has the highest implications (may need to > > purge > > > sink beforehand). > > > > > > [1] > > > > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187 > > > [2] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > [3] > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose < > > > [hidden email]> wrote: > > > > Hi, we are starting to use Beam with Flink as runner on our > > > > applications, and recently we would like to get advantages that > > > > Flink > > > > checkpoiting provides, but it seems we are not understanding it > > > > clearly. > > > > > > > > Simplifying, our application does the following: > > > > - Read meesages from a couple of Kafka topics > > > > - Combine them > > > > - Write combination result to a sink (Exasol DB) > > > > > > > > As application is processing messages using event time, and one > > of > > > > the > > > > topics is almost idle, the first time application is started > > > > messages > > > > are stuck in the combiner because watermark don't advance until > > we > > > > have > > > > messages arriving onto idled topic (we know this and is not a > > > > problem > > > > for us though). > > > > > > > > The problem is that we've observed, if a checkpoint is > > triggered > > > > when > > > > messages are still stuck in the combiner, surprisingly for us, > > the > > > > checkpoint finishes successfully (and offsets committed to > > Kafka) > > > > even > > > > messages haven't progressed to the sink yet. Is this expected? > > > > > > > > The thing is that, if in the future, we make not state > > compatible > > > > changes in application source code, checkpoint taken couldn't > > be > > > > restored. So we would like to start the application without > > using > > > > any > > > > checkpoint but without losing data. > > > > Problem here would be that data loss would happen because > > messages > > > > stuck in combiner are already committed to Kafka and > > application > > > > would > > > > start to read from latest commited offset in Kafka if we don't > > use > > > > any > > > > checkpoint, thus those messages are not going to be read from > > the > > > > source again. > > > > > > > > So, I guess our question is how are you doing in order to not > > lose > > > > data > > > > when developing applications, because sooner or later you are > > going > > > > to > > > > add breaking changes... > > > > > > > > For example, we've seen those two errors so far: > > > > - After changing an operator name: > > > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the > > cluster > > > > entrypoint. > > > > org.apache.flink.runtime.dispatcher.DispatcherException: Failed > > to > > > > take > > > > leadership with session id 00000000-0000-0000-0000- > > 000000000000. > > > > ... > > > > Caused by: > > org.apache.flink.runtime.client.JobExecutionException: > > > > Could > > > > not set up JobManager > > > > at > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManag > > > > erRu > > > > nner.java:152) > > > > at > > > > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory. > > > > crea > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > > > > at > > > > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMana > > > > gerR > > > > unner$5(Dispatcher.java:375) > > > > at > > > > > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(C > > > > heck > > > > edSupplier.java:34) > > > > ... 7 more > > > > Caused by: java.lang.IllegalStateException: Failed to rollback > > to > > > > checkpoint/savepoint > > hdfs://RTDWLTDEV/data/lake/processing/flink- > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28- > > bcc1f65a0986. > > > > Cannot map checkpoint/savepoint state for operator > > > > f476451c6210bd2783f36fa331b9da5e to the new program, because > > the > > > > operator is not available in the new program. If you want to > > allow > > > > to > > > > skip this, you can set the --allowNonRestoredState option on > > the > > > > CLI. > > > > at > > > > > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateChec > > > > kpoi > > > > nt(Checkpoints.java:205) > > > > ... > > > > > > > > - After modifying a Java model class involved in a combine: > > > > org.apache.flink.runtime.state.BackendBuildingException: Failed > > > > when > > > > trying to restore heap backend > > > > at > > > > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.bu > > > > ild( > > > > HeapKeyedStateBackendBuilder.java:116) > > > > at > > > > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKeye > > > > dSta > > > > teBackend(FsStateBackend.java:529) > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > > > Impl > > > > > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:29 > > > > 1) > > > > at > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.a > > > > ttem > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142) > > > > at > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.c > > > > reat > > > > eAndRestore(BackendRestorerProcedure.java:121) > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > > > Impl > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > > > at > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializer > > > > Impl > > > > > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135 > > > > ) > > > > at > > > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.ini > > > > tial > > > > izeState(AbstractStreamOperator.java:253) > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState > > > > (Str > > > > eamTask.java:881) > > > > at > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTa > > > > sk.j > > > > ava:395) > > > > at > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) > > > > at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > > at java.lang.Thread.run(Thread.java:748) > > > > Caused by: java.io.InvalidClassException: > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata; > > > > local > > > > class incompatible: stream classdesc serialVersionUID = > > > > 8366890161513008789, local class serialVersionUID = > > > > 174312384610985998 > > > > at > > > > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699) > > > > > > > > > > > > Apologies in advance as we are new to Flink, so may be we are > > > > missing > > > > something obvious here. > > > > > > > > Thanks > > > > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > > confidencial. A no ser que usted sea el destinatario, no puede > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > > información contenida en el mensaje. Si no es el destinatario, > > debe > > > > borrar este correo y notificar al remitente inmediatamente. > > > > Cualquier punto de vista u opinión expresada en este correo > > > > electrónico son únicamente del remitente, a no ser que se > > indique > > > > lo contrario. Todos los derechos de autor en cualquier material > > de > > > > este correo son reservados. Todos los correos electrónicos, > > > > salientes o entrantes, pueden ser grabados y monitorizados para > > uso > > > > legítimo del negocio. Nos encontramos exentos de toda > > > > responsabilidad ante cualquier perdida o daño que surja o > > resulte > > > > de la recepción, uso o transmisión de este correo electrónico > > hasta > > > > el máximo permitido por la ley. > > > > > > > > This email and any attachment to it are confidential. Unless > > you > > > > are the intended recipient, you may not use, copy or disclose > > > > either the message or any information contained in the message. > > If > > > > you are not the intended recipient, you should delete this > > > > and notify the sender immediately. Any views or opinions > > expressed > > > > in this email are those of the sender only, unless otherwise > > > > stated. All copyright in any of the material in this email is > > > > reserved. All emails, incoming and outgoing, may be recorded > > and > > > > monitored for legitimate business purposes. We exclude all > > > > liability for any loss or damage arising or resulting from the > > > > receipt, use or transmission of this email to the fullest > > extent > > > > permitted by law. > > > > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > confidencial. A no ser que usted sea el destinatario, no puede > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > información contenida en el mensaje. Si no es el destinatario, debe > > borrar este correo y notificar al remitente inmediatamente. > > Cualquier punto de vista u opinión expresada en este correo > > electrónico son únicamente del remitente, a no ser que se indique > > lo contrario. Todos los derechos de autor en cualquier material de > > este correo son reservados. Todos los correos electrónicos, > > salientes o entrantes, pueden ser grabados y monitorizados para uso > > legítimo del negocio. Nos encontramos exentos de toda > > responsabilidad ante cualquier perdida o daño que surja o resulte > > de la recepción, uso o transmisión de este correo electrónico hasta > > el máximo permitido por la ley. > > > > This email and any attachment to it are confidential. Unless you > > are the intended recipient, you may not use, copy or disclose > > either the message or any information contained in the message. If > > you are not the intended recipient, you should delete this email > > and notify the sender immediately. Any views or opinions expressed > > in this email are those of the sender only, unless otherwise > > stated. All copyright in any of the material in this email is > > reserved. All emails, incoming and outgoing, may be recorded and > > monitored for legitimate business purposes. We exclude all > > liability for any loss or damage arising or resulting from the > > receipt, use or transmission of this email to the fullest extent > > permitted by law. > > Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law. |
In reply to this post by Arvid Heise-3
Actually I'm also thinking about how Beam coders are related with
runner's serialization... I mean, on Beam you specify a coder per each Java type in order to Beam be able to serialize/deserialize that type, but then, is the runner used under the hood serializing/deserializing again the result, so that is doing a double serialization, does it make sense? Or how does it work? On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote: > Yep, sorry if I'm bothering you but I think I'm still not getting > this, > how could I tell Beam to tell Flink to use that serializer instead of > Java standard one, because I think Beam is abstracting us from Flink > checkpointing mechanism, so I'm afraid that if we use Flink API > directly we might break other things that Beam is hidding for us... > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote: > > Hi Ivan, > > > > The easiest way is to use some implementation that's already there > > [1]. I already mentioned Avro and would strongly recommend giving > > it > > a go. If you make sure to provide a default value for as many > > fields > > as possible, you can always remove them later giving you great > > flexibility. I can give you more hints if you decide to go this > > route. > > > > If you want to have a custom implementation, I'd start at looking > > of > > one of the simpler implementations like MapSerializerSnapshot [2]. > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html > > (see known implementing classes). > > [2] > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java > > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose < > > [hidden email]> wrote: > > > Thanks for your complete answer Arvid, we will try to approach > > > all > > > things you mentioned, but take into account we are using Beam on > > > top of > > > Flink, so, to be honest, I don't know how could we implement the > > > custom > > > serialization thing ( > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > ) there. Could you please give us some hints? Thanks > > > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote: > > > > Hi Ivan, > > > > > > > > First let's address the issue with idle partitions. The > > > > solution > > > is > > > > to use a watermark assigner that also emits a watermark with > > > > some > > > > idle timeout [1]. > > > > > > > > Now the second question, on why Kafka commits are committed for > > > in- > > > > flight, checkpointed data. The basic idea is that you are not > > > losing > > > > data while avoiding replicated output. > > > > So if you commit offsets only after data has been fully > > > processed, > > > > upon crash the same data point would be reprocessed jointly > > > > with > > > the > > > > restored in-flight data, so you get duplicate messages in your > > > > system. > > > > To avoid duplicates data needs to be more or less completely > > > flushed > > > > out the system before a checkpoint is performed. That would > > > produce a > > > > huge downtime. > > > > Instead, we assume that we can always resume from the > > > checkpoints. > > > > Which leads to the last question on what to do when your > > > > pipeline > > > has > > > > breaking changes. > > > > First strategy is to avoid breaking changes as much as > > > > possible. > > > > State could for example also be stored as Avro to allow schema > > > > evolution. Minor things like renamed operators will not happen > > > with a > > > > bit more expertise. > > > > Second strategy is to use state migration [2]. Alternatively, > > > > you > > > can > > > > manually convert state with state processor API [3]. > > > > Last option is to do a full reprocessing of data. This can be > > > done on > > > > a non-production cluster and then a savepoint can be used to > > > > bootstrap the production cluster quickly. This option needs to > > > > be > > > > available anyways for the case that you find any logic error. > > > > But > > > of > > > > course, this option has the highest implications (may need to > > > purge > > > > sink beforehand). > > > > > > > > [1] > > > > > > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187 > > > > [2] > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > > [3] > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose < > > > > [hidden email]> wrote: > > > > > Hi, we are starting to use Beam with Flink as runner on our > > > > > applications, and recently we would like to get advantages > > > > > that > > > > > Flink > > > > > checkpoiting provides, but it seems we are not understanding > > > > > it > > > > > clearly. > > > > > > > > > > Simplifying, our application does the following: > > > > > - Read meesages from a couple of Kafka topics > > > > > - Combine them > > > > > - Write combination result to a sink (Exasol DB) > > > > > > > > > > As application is processing messages using event time, and > > > > > one > > > of > > > > > the > > > > > topics is almost idle, the first time application is started > > > > > messages > > > > > are stuck in the combiner because watermark don't advance > > > > > until > > > we > > > > > have > > > > > messages arriving onto idled topic (we know this and is not a > > > > > problem > > > > > for us though). > > > > > > > > > > The problem is that we've observed, if a checkpoint is > > > triggered > > > > > when > > > > > messages are still stuck in the combiner, surprisingly for > > > > > us, > > > the > > > > > checkpoint finishes successfully (and offsets committed to > > > Kafka) > > > > > even > > > > > messages haven't progressed to the sink yet. Is this > > > > > expected? > > > > > > > > > > The thing is that, if in the future, we make not state > > > compatible > > > > > changes in application source code, checkpoint taken couldn't > > > be > > > > > restored. So we would like to start the application without > > > using > > > > > any > > > > > checkpoint but without losing data. > > > > > Problem here would be that data loss would happen because > > > messages > > > > > stuck in combiner are already committed to Kafka and > > > application > > > > > would > > > > > start to read from latest commited offset in Kafka if we > > > > > don't > > > use > > > > > any > > > > > checkpoint, thus those messages are not going to be read from > > > the > > > > > source again. > > > > > > > > > > So, I guess our question is how are you doing in order to not > > > lose > > > > > data > > > > > when developing applications, because sooner or later you are > > > going > > > > > to > > > > > add breaking changes... > > > > > > > > > > For example, we've seen those two errors so far: > > > > > - After changing an operator name: > > > > > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the > > > cluster > > > > > entrypoint. > > > > > org.apache.flink.runtime.dispatcher.DispatcherException: > > > > > Failed > > > to > > > > > take > > > > > leadership with session id 00000000-0000-0000-0000- > > > 000000000000. > > > > > ... > > > > > Caused by: > > > org.apache.flink.runtime.client.JobExecutionException: > > > > > Could > > > > > not set up JobManager > > > > > at > > > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan > > > ag > > > > > erRu > > > > > nner.java:152) > > > > > at > > > > > > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor > > > y. > > > > > crea > > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > > > > > at > > > > > > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa > > > na > > > > > gerR > > > > > unner$5(Dispatcher.java:375) > > > > > at > > > > > > > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0 > > > (C > > > > > heck > > > > > edSupplier.java:34) > > > > > ... 7 more > > > > > Caused by: java.lang.IllegalStateException: Failed to > > > > > rollback > > > to > > > > > checkpoint/savepoint > > > hdfs://RTDWLTDEV/data/lake/processing/flink- > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28- > > > bcc1f65a0986. > > > > > Cannot map checkpoint/savepoint state for operator > > > > > f476451c6210bd2783f36fa331b9da5e to the new program, because > > > the > > > > > operator is not available in the new program. If you want to > > > allow > > > > > to > > > > > skip this, you can set the --allowNonRestoredState option on > > > the > > > > > CLI. > > > > > at > > > > > > > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh > > > ec > > > > > kpoi > > > > > nt(Checkpoints.java:205) > > > > > ... > > > > > > > > > > - After modifying a Java model class involved in a combine: > > > > > org.apache.flink.runtime.state.BackendBuildingException: > > > > > Failed > > > > > when > > > > > trying to restore heap backend > > > > > at > > > > > > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder. > > > bu > > > > > ild( > > > > > HeapKeyedStateBackendBuilder.java:116) > > > > > at > > > > > > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe > > > ye > > > > > dSta > > > > > teBackend(FsStateBackend.java:529) > > > > > at > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > er > > > > > Impl > > > > > > > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java: > > > 29 > > > > > 1) > > > > > at > > > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > > .a > > > > > ttem > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142) > > > > > at > > > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > > .c > > > > > reat > > > > > eAndRestore(BackendRestorerProcedure.java:121) > > > > > at > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > er > > > > > Impl > > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > > > > at > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > er > > > > > Impl > > > > > > > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1 > > > 35 > > > > > ) > > > > > at > > > > > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i > > > ni > > > > > tial > > > > > izeState(AbstractStreamOperator.java:253) > > > > > at > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta > > > te > > > > > (Str > > > > > eamTask.java:881) > > > > > at > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream > > > Ta > > > > > sk.j > > > > > ava:395) > > > > > at > > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705 > > > > > ) > > > > > at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > Caused by: java.io.InvalidClassException: > > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata > > > > > ; > > > > > local > > > > > class incompatible: stream classdesc serialVersionUID = > > > > > 8366890161513008789, local class serialVersionUID = > > > > > 174312384610985998 > > > > > at > > > > > > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699 > > > ) > > > > > > > > > > Apologies in advance as we are new to Flink, so may be we are > > > > > missing > > > > > something obvious here. > > > > > > > > > > Thanks > > > > > > > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > > > confidencial. A no ser que usted sea el destinatario, no > > > > > puede > > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > > > información contenida en el mensaje. Si no es el > > > > > destinatario, > > > debe > > > > > borrar este correo y notificar al remitente inmediatamente. > > > > > Cualquier punto de vista u opinión expresada en este correo > > > > > electrónico son únicamente del remitente, a no ser que se > > > indique > > > > > lo contrario. Todos los derechos de autor en cualquier > > > > > material > > > de > > > > > este correo son reservados. Todos los correos electrónicos, > > > > > salientes o entrantes, pueden ser grabados y monitorizados > > > > > para > > > uso > > > > > legítimo del negocio. Nos encontramos exentos de toda > > > > > responsabilidad ante cualquier perdida o daño que surja o > > > resulte > > > > > de la recepción, uso o transmisión de este correo electrónico > > > hasta > > > > > el máximo permitido por la ley. > > > > > > > > > > This email and any attachment to it are confidential. Unless > > > you > > > > > are the intended recipient, you may not use, copy or disclose > > > > > either the message or any information contained in the > > > > > message. > > > If > > > > > you are not the intended recipient, you should delete this > > > > > and notify the sender immediately. Any views or opinions > > > expressed > > > > > in this email are those of the sender only, unless otherwise > > > > > stated. All copyright in any of the material in this email is > > > > > reserved. All emails, incoming and outgoing, may be recorded > > > and > > > > > monitored for legitimate business purposes. We exclude all > > > > > liability for any loss or damage arising or resulting from > > > > > the > > > > > receipt, use or transmission of this email to the fullest > > > extent > > > > > permitted by law. > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > confidencial. A no ser que usted sea el destinatario, no puede > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > información contenida en el mensaje. Si no es el destinatario, > > > debe > > > borrar este correo y notificar al remitente inmediatamente. > > > Cualquier punto de vista u opinión expresada en este correo > > > electrónico son únicamente del remitente, a no ser que se indique > > > lo contrario. Todos los derechos de autor en cualquier material > > > de > > > este correo son reservados. Todos los correos electrónicos, > > > salientes o entrantes, pueden ser grabados y monitorizados para > > > uso > > > legítimo del negocio. Nos encontramos exentos de toda > > > responsabilidad ante cualquier perdida o daño que surja o resulte > > > de la recepción, uso o transmisión de este correo electrónico > > > hasta > > > el máximo permitido por la ley. > > > > > > This email and any attachment to it are confidential. Unless you > > > are the intended recipient, you may not use, copy or disclose > > > either the message or any information contained in the message. > > > If > > > you are not the intended recipient, you should delete this email > > > and notify the sender immediately. Any views or opinions > > > expressed > > > in this email are those of the sender only, unless otherwise > > > stated. All copyright in any of the material in this email is > > > reserved. All emails, incoming and outgoing, may be recorded and > > > monitored for legitimate business purposes. We exclude all > > > liability for any loss or damage arising or resulting from the > > > receipt, use or transmission of this email to the fullest extent > > > permitted by law. > > Este correo electrónico y sus adjuntos son de naturaleza > confidencial. A no ser que usted sea el destinatario, no puede > utilizar, copiar o desvelar tanto el mensaje como cualquier > información contenida en el mensaje. Si no es el destinatario, debe > borrar este correo y notificar al remitente inmediatamente. Cualquier > punto de vista u opinión expresada en este correo electrónico son > únicamente del remitente, a no ser que se indique lo contrario. Todos > los derechos de autor en cualquier material de este correo son > reservados. Todos los correos electrónicos, salientes o entrantes, > pueden ser grabados y monitorizados para uso legítimo del negocio. > Nos encontramos exentos de toda responsabilidad ante cualquier > perdida o daño que surja o resulte de la recepción, uso o transmisión > de este correo electrónico hasta el máximo permitido por la ley. > > This email and any attachment to it are confidential. Unless you are > the intended recipient, you may not use, copy or disclose either the > message or any information contained in the message. If you are not > the intended recipient, you should delete this email and notify the > sender immediately. Any views or opinions expressed in this email are > those of the sender only, unless otherwise stated. All copyright in > any of the material in this email is reserved. All emails, incoming > and outgoing, may be recorded and monitored for legitimate business > purposes. We exclude all liability for any loss or damage arising or > resulting from the receipt, use or transmission of this email to the > fullest extent permitted by law. Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law. |
In reply to this post by Arvid Heise-3
Hi Ivan, I'm fearing that only a few mailing list users have actually deeper Beam experience. I only used it briefly 3 years ago. Most users here are using Flink directly to avoid these kinds of double-abstraction issues. It might be better to switch to the Beam mailing list if you have Beam-specific questions including how the Flink runner actually translates the Beam program to Flink. On Tue, May 19, 2020 at 11:38 AM Ivan San Jose <[hidden email]> wrote: Actually I'm also thinking about how Beam coders are related with -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Perfect, thank you so much Arvid, I'd expect more people using Beam on
top of Flink, but it seems is not so popular. On Tue, 2020-05-19 at 12:46 +0200, Arvid Heise wrote: > Hi Ivan, > > I'm fearing that only a few mailing list users have actually deeper > Beam experience. I only used it briefly 3 years ago. Most users here > are using Flink directly to avoid these kinds of double-abstraction > issues. > > It might be better to switch to the Beam mailing list if you have > Beam-specific questions including how the Flink runner actually > translates the Beam program to Flink. > > On Tue, May 19, 2020 at 11:38 AM Ivan San Jose < > [hidden email]> wrote: > > Actually I'm also thinking about how Beam coders are related with > > runner's serialization... I mean, on Beam you specify a coder per > > each > > Java type in order to Beam be able to serialize/deserialize that > > type, > > but then, is the runner used under the hood > > serializing/deserializing > > again the result, so that is doing a double serialization, does it > > make > > sense? Or how does it work? > > > > On Tue, 2020-05-19 at 08:54 +0000, Ivan San Jose wrote: > > > Yep, sorry if I'm bothering you but I think I'm still not getting > > > this, > > > how could I tell Beam to tell Flink to use that serializer > > instead of > > > Java standard one, because I think Beam is abstracting us from > > Flink > > > checkpointing mechanism, so I'm afraid that if we use Flink API > > > directly we might break other things that Beam is hidding for > > us... > > > > > > On Tue, 2020-05-19 at 10:44 +0200, Arvid Heise wrote: > > > > Hi Ivan, > > > > > > > > The easiest way is to use some implementation that's already > > there > > > > [1]. I already mentioned Avro and would strongly recommend > > giving > > > > it > > > > a go. If you make sure to provide a default value for as many > > > > fields > > > > as possible, you can always remove them later giving you great > > > > flexibility. I can give you more hints if you decide to go this > > > > route. > > > > > > > > If you want to have a custom implementation, I'd start at > > looking > > > > of > > > > one of the simpler implementations like MapSerializerSnapshot > > [2]. > > > > > > > > [1] > > > > > > https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeutils/TypeSerializerSnapshot.html > > > > (see known implementing classes). > > > > [2] > > > > > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializerSnapshot.java > > > > > > > > On Tue, May 19, 2020 at 10:31 AM Ivan San Jose < > > > > [hidden email]> wrote: > > > > > Thanks for your complete answer Arvid, we will try to > > approach > > > > > all > > > > > things you mentioned, but take into account we are using Beam > > on > > > > > top of > > > > > Flink, so, to be honest, I don't know how could we implement > > the > > > > > custom > > > > > serialization thing ( > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > > > ) there. Could you please give us some hints? Thanks > > > > > > > > > > On Mon, 2020-05-18 at 12:41 +0200, Arvid Heise wrote: > > > > > > Hi Ivan, > > > > > > > > > > > > First let's address the issue with idle partitions. The > > > > > > solution > > > > > is > > > > > > to use a watermark assigner that also emits a watermark > > with > > > > > > some > > > > > > idle timeout [1]. > > > > > > > > > > > > Now the second question, on why Kafka commits are committed > > for > > > > > in- > > > > > > flight, checkpointed data. The basic idea is that you are > > not > > > > > losing > > > > > > data while avoiding replicated output. > > > > > > So if you commit offsets only after data has been fully > > > > > processed, > > > > > > upon crash the same data point would be reprocessed jointly > > > > > > with > > > > > the > > > > > > restored in-flight data, so you get duplicate messages in > > your > > > > > > system. > > > > > > To avoid duplicates data needs to be more or less > > completely > > > > > flushed > > > > > > out the system before a checkpoint is performed. That would > > > > > produce a > > > > > > huge downtime. > > > > > > Instead, we assume that we can always resume from the > > > > > checkpoints. > > > > > > Which leads to the last question on what to do when your > > > > > > pipeline > > > > > has > > > > > > breaking changes. > > > > > > First strategy is to avoid breaking changes as much as > > > > > > possible. > > > > > > State could for example also be stored as Avro to allow > > schema > > > > > > evolution. Minor things like renamed operators will not > > happen > > > > > with a > > > > > > bit more expertise. > > > > > > Second strategy is to use state migration [2]. > > Alternatively, > > > > > > you > > > > > can > > > > > > manually convert state with state processor API [3]. > > > > > > Last option is to do a full reprocessing of data. This can > > be > > > > > done on > > > > > > a non-production cluster and then a savepoint can be used > > to > > > > > > bootstrap the production cluster quickly. This option needs > > to > > > > > > be > > > > > > available anyways for the case that you find any logic > > error. > > > > > > But > > > > > of > > > > > > course, this option has the highest implications (may need > > to > > > > > purge > > > > > > sink beforehand). > > > > > > > > > > > > [1] > > > > > > > > > > > > > https://github.com/ververica/flink-training-troubleshooting/blob/master/src/main/java/com/ververica/flinktraining/solutions/troubleshoot/TroubledStreamingJobSolution2.java#L129-L187 > > > > > > [2] > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/custom_serialization.html#the-typeserializersnapshot-abstraction > > > > > > [3] > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > > > > > On Fri, May 15, 2020 at 2:58 PM Ivan San Jose < > > > > > > [hidden email]> wrote: > > > > > > > Hi, we are starting to use Beam with Flink as runner on > > our > > > > > > > applications, and recently we would like to get > > advantages > > > > > > > that > > > > > > > Flink > > > > > > > checkpoiting provides, but it seems we are not > > understanding > > > > > > > it > > > > > > > clearly. > > > > > > > > > > > > > > Simplifying, our application does the following: > > > > > > > - Read meesages from a couple of Kafka topics > > > > > > > - Combine them > > > > > > > - Write combination result to a sink (Exasol DB) > > > > > > > > > > > > > > As application is processing messages using event time, > > and > > > > > > > one > > > > > of > > > > > > > the > > > > > > > topics is almost idle, the first time application is > > started > > > > > > > messages > > > > > > > are stuck in the combiner because watermark don't advance > > > > > > > until > > > > > we > > > > > > > have > > > > > > > messages arriving onto idled topic (we know this and is > > not a > > > > > > > problem > > > > > > > for us though). > > > > > > > > > > > > > > The problem is that we've observed, if a checkpoint is > > > > > triggered > > > > > > > when > > > > > > > messages are still stuck in the combiner, surprisingly > > for > > > > > > > us, > > > > > the > > > > > > > checkpoint finishes successfully (and offsets committed > > to > > > > > Kafka) > > > > > > > even > > > > > > > messages haven't progressed to the sink yet. Is this > > > > > > > expected? > > > > > > > > > > > > > > The thing is that, if in the future, we make not state > > > > > compatible > > > > > > > changes in application source code, checkpoint taken > > couldn't > > > > > be > > > > > > > restored. So we would like to start the application > > without > > > > > using > > > > > > > any > > > > > > > checkpoint but without losing data. > > > > > > > Problem here would be that data loss would happen because > > > > > messages > > > > > > > stuck in combiner are already committed to Kafka and > > > > > application > > > > > > > would > > > > > > > start to read from latest commited offset in Kafka if we > > > > > > > don't > > > > > use > > > > > > > any > > > > > > > checkpoint, thus those messages are not going to be read > > from > > > > > the > > > > > > > source again. > > > > > > > > > > > > > > So, I guess our question is how are you doing in order to > > not > > > > > lose > > > > > > > data > > > > > > > when developing applications, because sooner or later you > > are > > > > > going > > > > > > > to > > > > > > > add breaking changes... > > > > > > > > > > > > > > For example, we've seen those two errors so far: > > > > > > > - After changing an operator name: > > > > > > > > > > > > > > 2020-05-13 07:23:52,248 ERROR Fatal error occurred in the > > > > > cluster > > > > > > > entrypoint. > > > > > > > org.apache.flink.runtime.dispatcher.DispatcherException: > > > > > > > Failed > > > > > to > > > > > > > take > > > > > > > leadership with session id 00000000-0000-0000-0000- > > > > > 000000000000. > > > > > > > ... > > > > > > > Caused by: > > > > > org.apache.flink.runtime.client.JobExecutionException: > > > > > > > Could > > > > > > > not set up JobManager > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobMan > > > > > ag > > > > > > > erRu > > > > > > > nner.java:152) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactor > > > > > y. > > > > > > > crea > > > > > > > > > teJobManagerRunner(DefaultJobManagerRunnerFactory.java:83) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobMa > > > > > na > > > > > > > gerR > > > > > > > unner$5(Dispatcher.java:375) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0 > > > > > (C > > > > > > > heck > > > > > > > edSupplier.java:34) > > > > > > > ... 7 more > > > > > > > Caused by: java.lang.IllegalStateException: Failed to > > > > > > > rollback > > > > > to > > > > > > > checkpoint/savepoint > > > > > hdfs://RTDWLTDEV/data/lake/processing/flink- > > > > > > > savepoints/holly-reconciliation-fact/savepoint-90ab28- > > > > > bcc1f65a0986. > > > > > > > Cannot map checkpoint/savepoint state for operator > > > > > > > f476451c6210bd2783f36fa331b9da5e to the new program, > > because > > > > > the > > > > > > > operator is not available in the new program. If you want > > to > > > > > allow > > > > > > > to > > > > > > > skip this, you can set the --allowNonRestoredState option > > on > > > > > the > > > > > > > CLI. > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCh > > > > > ec > > > > > > > kpoi > > > > > > > nt(Checkpoints.java:205) > > > > > > > ... > > > > > > > > > > > > > > - After modifying a Java model class involved in a > > combine: > > > > > > > org.apache.flink.runtime.state.BackendBuildingException: > > > > > > > Failed > > > > > > > when > > > > > > > trying to restore heap backend > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder. > > > > > bu > > > > > > > ild( > > > > > > > HeapKeyedStateBackendBuilder.java:116) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.runtime.state.filesystem.FsStateBackend.createKe > > > > > ye > > > > > > > dSta > > > > > > > teBackend(FsStateBackend.java:529) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > > > er > > > > > > > Impl > > > > > > > > > > > > > > .lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java: > > > > > 29 > > > > > > > 1) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > > > > .a > > > > > > > ttem > > > > > > > ptCreateAndRestore(BackendRestorerProcedure.java:142) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.BackendRestorerProcedure > > > > > .c > > > > > > > reat > > > > > > > eAndRestore(BackendRestorerProcedure.java:121) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > > > er > > > > > > > Impl > > > > > > > > > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializ > > > > > er > > > > > > > Impl > > > > > > > > > > > > > > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:1 > > > > > 35 > > > > > > > ) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.i > > > > > ni > > > > > > > tial > > > > > > > izeState(AbstractStreamOperator.java:253) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeSta > > > > > te > > > > > > > (Str > > > > > > > eamTask.java:881) > > > > > > > at > > > > > > > > > > > > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(Stream > > > > > Ta > > > > > > > sk.j > > > > > > > ava:395) > > > > > > > at > > > > > > > > > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705 > > > > > > > ) > > > > > > > at > > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) > > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > Caused by: java.io.InvalidClassException: > > > > > > > > > internal.holly.beatrix.wallet.walletfact.model.WalletMetadata > > > > > > > ; > > > > > > > local > > > > > > > class incompatible: stream classdesc serialVersionUID = > > > > > > > 8366890161513008789, local class serialVersionUID = > > > > > > > 174312384610985998 > > > > > > > at > > > > > > > > > > > > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699 > > > > > ) > > > > > > > > > > > > > > Apologies in advance as we are new to Flink, so may be we > > are > > > > > > > missing > > > > > > > something obvious here. > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > > > > > confidencial. A no ser que usted sea el destinatario, no > > > > > > > puede > > > > > > > utilizar, copiar o desvelar tanto el mensaje como > > cualquier > > > > > > > información contenida en el mensaje. Si no es el > > > > > > > destinatario, > > > > > debe > > > > > > > borrar este correo y notificar al remitente > > inmediatamente. > > > > > > > Cualquier punto de vista u opinión expresada en este > > correo > > > > > > > electrónico son únicamente del remitente, a no ser que se > > > > > indique > > > > > > > lo contrario. Todos los derechos de autor en cualquier > > > > > > > material > > > > > de > > > > > > > este correo son reservados. Todos los correos > > electrónicos, > > > > > > > salientes o entrantes, pueden ser grabados y > > monitorizados > > > > > > > para > > > > > uso > > > > > > > legítimo del negocio. Nos encontramos exentos de toda > > > > > > > responsabilidad ante cualquier perdida o daño que surja o > > > > > resulte > > > > > > > de la recepción, uso o transmisión de este correo > > electrónico > > > > > hasta > > > > > > > el máximo permitido por la ley. > > > > > > > > > > > > > > This email and any attachment to it are confidential. > > Unless > > > > > you > > > > > > > are the intended recipient, you may not use, copy or > > disclose > > > > > > > either the message or any information contained in the > > > > > > > message. > > > > > If > > > > > > > you are not the intended recipient, you should delete > > this > > > > > > > and notify the sender immediately. Any views or opinions > > > > > expressed > > > > > > > in this email are those of the sender only, unless > > otherwise > > > > > > > stated. All copyright in any of the material in this > > email is > > > > > > > reserved. All emails, incoming and outgoing, may be > > recorded > > > > > and > > > > > > > monitored for legitimate business purposes. We exclude > > all > > > > > > > liability for any loss or damage arising or resulting > > from > > > > > > > the > > > > > > > receipt, use or transmission of this email to the fullest > > > > > extent > > > > > > > permitted by law. > > > > > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > > > confidencial. A no ser que usted sea el destinatario, no > > puede > > > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > > > información contenida en el mensaje. Si no es el > > destinatario, > > > > > debe > > > > > borrar este correo y notificar al remitente inmediatamente. > > > > > Cualquier punto de vista u opinión expresada en este correo > > > > > electrónico son únicamente del remitente, a no ser que se > > indique > > > > > lo contrario. Todos los derechos de autor en cualquier > > material > > > > > de > > > > > este correo son reservados. Todos los correos electrónicos, > > > > > salientes o entrantes, pueden ser grabados y monitorizados > > para > > > > > uso > > > > > legítimo del negocio. Nos encontramos exentos de toda > > > > > responsabilidad ante cualquier perdida o daño que surja o > > resulte > > > > > de la recepción, uso o transmisión de este correo electrónico > > > > > hasta > > > > > el máximo permitido por la ley. > > > > > > > > > > This email and any attachment to it are confidential. Unless > > you > > > > > are the intended recipient, you may not use, copy or disclose > > > > > either the message or any information contained in the > > message. > > > > > If > > > > > you are not the intended recipient, you should delete this > > > > > and notify the sender immediately. Any views or opinions > > > > > expressed > > > > > in this email are those of the sender only, unless otherwise > > > > > stated. All copyright in any of the material in this email is > > > > > reserved. All emails, incoming and outgoing, may be recorded > > and > > > > > monitored for legitimate business purposes. We exclude all > > > > > liability for any loss or damage arising or resulting from > > the > > > > > receipt, use or transmission of this email to the fullest > > extent > > > > > permitted by law. > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > > confidencial. A no ser que usted sea el destinatario, no puede > > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > > información contenida en el mensaje. Si no es el destinatario, > > debe > > > borrar este correo y notificar al remitente inmediatamente. > > Cualquier > > > punto de vista u opinión expresada en este correo electrónico son > > > únicamente del remitente, a no ser que se indique lo contrario. > > Todos > > > los derechos de autor en cualquier material de este correo son > > > reservados. Todos los correos electrónicos, salientes o > > entrantes, > > > pueden ser grabados y monitorizados para uso legítimo del > > negocio. > > > Nos encontramos exentos de toda responsabilidad ante cualquier > > > perdida o daño que surja o resulte de la recepción, uso o > > transmisión > > > de este correo electrónico hasta el máximo permitido por la ley. > > > > > > This email and any attachment to it are confidential. Unless you > > are > > > the intended recipient, you may not use, copy or disclose either > > the > > > message or any information contained in the message. If you are > > not > > > the intended recipient, you should delete this email and notify > > the > > > sender immediately. Any views or opinions expressed in this email > > are > > > those of the sender only, unless otherwise stated. All copyright > > in > > > any of the material in this email is reserved. All emails, > > incoming > > > and outgoing, may be recorded and monitored for legitimate > > business > > > purposes. We exclude all liability for any loss or damage arising > > or > > > resulting from the receipt, use or transmission of this email to > > the > > > fullest extent permitted by law. > > > > > > Este correo electrónico y sus adjuntos son de naturaleza > > confidencial. A no ser que usted sea el destinatario, no puede > > utilizar, copiar o desvelar tanto el mensaje como cualquier > > información contenida en el mensaje. Si no es el destinatario, debe > > borrar este correo y notificar al remitente inmediatamente. > > Cualquier punto de vista u opinión expresada en este correo > > electrónico son únicamente del remitente, a no ser que se indique > > lo contrario. Todos los derechos de autor en cualquier material de > > este correo son reservados. Todos los correos electrónicos, > > salientes o entrantes, pueden ser grabados y monitorizados para uso > > legítimo del negocio. Nos encontramos exentos de toda > > responsabilidad ante cualquier perdida o daño que surja o resulte > > de la recepción, uso o transmisión de este correo electrónico hasta > > el máximo permitido por la ley. > > > > This email and any attachment to it are confidential. Unless you > > are the intended recipient, you may not use, copy or disclose > > either the message or any information contained in the message. If > > you are not the intended recipient, you should delete this email > > and notify the sender immediately. Any views or opinions expressed > > in this email are those of the sender only, unless otherwise > > stated. All copyright in any of the material in this email is > > reserved. All emails, incoming and outgoing, may be recorded and > > monitored for legitimate business purposes. We exclude all > > liability for any loss or damage arising or resulting from the > > receipt, use or transmission of this email to the fullest extent > > permitted by law. > > Este correo electrónico y sus adjuntos son de naturaleza confidencial. A no ser que usted sea el destinatario, no puede utilizar, copiar o desvelar tanto el mensaje como cualquier información contenida en el mensaje. Si no es el destinatario, debe borrar este correo y notificar al remitente inmediatamente. Cualquier punto de vista u opinión expresada en este correo electrónico son únicamente del remitente, a no ser que se indique lo contrario. Todos los derechos de autor en cualquier material de este correo son reservados. Todos los correos electrónicos, salientes o entrantes, pueden ser grabados y monitorizados para uso legítimo del negocio. Nos encontramos exentos de toda responsabilidad ante cualquier perdida o daño que surja o resulte de la recepción, uso o transmisión de este correo electrónico hasta el máximo permitido por la ley. This email and any attachment to it are confidential. Unless you are the intended recipient, you may not use, copy or disclose either the message or any information contained in the message. If you are not the intended recipient, you should delete this email and notify the sender immediately. Any views or opinions expressed in this email are those of the sender only, unless otherwise stated. All copyright in any of the material in this email is reserved. All emails, incoming and outgoing, may be recorded and monitored for legitimate business purposes. We exclude all liability for any loss or damage arising or resulting from the receipt, use or transmission of this email to the fullest extent permitted by law. |
In reply to this post by Arvid Heise-3
Hi Ivan, Beam coders are wrapped in Flink's TypeSerializers. So I don't think it will result in double serialization. Thanks! Eleanore On Tue, May 19, 2020 at 4:04 AM Ivan San Jose <[hidden email]> wrote: Perfect, thank you so much Arvid, I'd expect more people using Beam on |
Free forum by Nabble | Edit this page |