How to anchor a job level singleton object as part of TaskManager JVM?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to anchor a job level singleton object as part of TaskManager JVM?

Connie Yang
Hi,

I'm using Flink 1.4.2 release.

My Flink streaming job needs a way to make a singleton object available throughout the job graph which consists of Kafka Source, ProcessFunction, RichAsyncFunction, RichSinkFunction.  What's the best way to achieve this?

As an attempt, I have tried anchoring my object in a jar that is included as part of the TaskManager JVM class path.  But it does not seem to work in the following scenario:
1. When the job manager process was restarted (due to a pod failure) while my job is running
2. When the job manager comes back, it attempts to restart my job, which seems a bit odd.
3. But, my job kept failing with a ClassCastException where it attempts to retrieve the singleton from TaskManager class loader.

2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (10/12) (289e6ee24be520d81890dfe5a1a164fa) switched from RUNNING to FAILED.
java.lang.ClassCastException: org.monstor.flink.coordination.Coordinator cannot be cast to org.monstor.flink.coordination.Coordinator
        at org.monstor.flink.coordination.CoordinatorHolder.<init>(CoordinatorHolder.java:48)
        at org.monstor.myjob.flink.MyAsyncWriter.open(GsiStoreAsyncWriter.java:102)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:164)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (2/12) (e55b878f5c9d91a0dc557dd60b80e867) switched from RUNNING to CANCELING.

Thanks,
Connie
Reply | Threaded
Open this post in threaded view
|

Re: How to anchor a job level singleton object as part of TaskManager JVM?

Connie Yang
BTW, the ClassCastException error does not happen when the job was first cancelled and started.  My Flink cluster is setup with Job Manager HA enabled.

So, what's the difference between the job restart when the Job Manager was bounced and when the usual job "cancel" and "start" combination.

On Sun, Jun 3, 2018 at 3:44 PM, Connie Yang <[hidden email]> wrote:
Hi,

I'm using Flink 1.4.2 release.

My Flink streaming job needs a way to make a singleton object available throughout the job graph which consists of Kafka Source, ProcessFunction, RichAsyncFunction, RichSinkFunction.  What's the best way to achieve this?

As an attempt, I have tried anchoring my object in a jar that is included as part of the TaskManager JVM class path.  But it does not seem to work in the following scenario:
1. When the job manager process was restarted (due to a pod failure) while my job is running
2. When the job manager comes back, it attempts to restart my job, which seems a bit odd.
3. But, my job kept failing with a ClassCastException where it attempts to retrieve the singleton from TaskManager class loader.

2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (10/12) (289e6ee24be520d81890dfe5a1a164fa) switched from RUNNING to FAILED.
java.lang.ClassCastException: org.monstor.flink.coordination.Coordinator cannot be cast to org.monstor.flink.coordination.Coordinator
        at org.monstor.flink.coordination.CoordinatorHolder.<init>(CoordinatorHolder.java:48)
        at org.monstor.myjob.flink.MyAsyncWriter.open(GsiStoreAsyncWriter.java:102)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:164)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (2/12) (e55b878f5c9d91a0dc557dd60b80e867) switched from RUNNING to CANCELING.

Thanks,
Connie