Hi,
I'm using Flink 1.4.2 release. My Flink streaming job needs a way to make a singleton object available throughout the job graph which consists of Kafka Source, ProcessFunction, RichAsyncFunction, RichSinkFunction. What's the best way to achieve this? As an attempt, I have tried anchoring my object in a jar that is included as part of the TaskManager JVM class path. But it does not seem to work in the following scenario: 1. When the job manager process was restarted (due to a pod failure) while my job is running 2. When the job manager comes back, it attempts to restart my job, which seems a bit odd. 3. But, my job kept failing with a ClassCastException where it attempts to retrieve the singleton from TaskManager class loader. 2018-06-02 01:58:56,614 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: monstor -> Process -> async wait operator -> P rocess -> Sink: deadletter (10/12) (289e6ee24be520d81890dfe5a1a164fa) switched from RUNNING to FAILED. java.lang.ClassCastException: org.monstor.flink.coordination.Coordinator cannot be cast to org.monstor.flink.coordination.Coordinator at org.monstor.flink.coordination.CoordinatorHolder.<init>(CoordinatorHolder.java:48) at org.monstor.myjob.flink.MyAsyncWriter.open(GsiStoreAsyncWriter.java:102) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:164) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) 2018-06-02 01:58:56,614 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: monstor -> Process -> async wait operator -> P rocess -> Sink: deadletter (2/12) (e55b878f5c9d91a0dc557dd60b80e867) switched from RUNNING to CANCELING. Thanks, Connie |
BTW, the ClassCastException error does not happen when the job was first cancelled and started. My Flink cluster is setup with Job Manager HA enabled. So, what's the difference between the job restart when the Job Manager was bounced and when the usual job "cancel" and "start" combination. On Sun, Jun 3, 2018 at 3:44 PM, Connie Yang <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |