PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Oytun Tez
Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

JingsongLee
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Oytun Tez
Hey JingsongLee!

Here are some findings...
  • flatSelect without timeout works normally: patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
  • Converted the both timeout and select selectors to an inner class (not static), yielded the same results, doesn't compile.
  • flatSelect without timeout, but with an inner class for PatternFlatSelectFunction, it compiles (same as first bullet).
  • Tried both of these selectors with empty body. Just a skeleton class. Doesn't compile either. Empty body example is in my first email.
  • Tried making both selectors static public inner classes, doesn't compile either.
  • Extracted both timeout and flat selectors to their own independent classes in separate files. Doesn't compile.
  • I am putting the error stack below.
  • Without the timeout selector in any class or lambda shape, with empty or full body, flatSelect compiles well.
Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Oytun Tez
Forgot to answer one of your points: the parent class compiles well without this CEP selector (with timeout signature)...

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <[hidden email]> wrote:
Hey JingsongLee!

Here are some findings...
  • flatSelect without timeout works normally: patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
  • Converted the both timeout and select selectors to an inner class (not static), yielded the same results, doesn't compile.
  • flatSelect without timeout, but with an inner class for PatternFlatSelectFunction, it compiles (same as first bullet).
  • Tried both of these selectors with empty body. Just a skeleton class. Doesn't compile either. Empty body example is in my first email.
  • Tried making both selectors static public inner classes, doesn't compile either.
  • Extracted both timeout and flat selectors to their own independent classes in separate files. Doesn't compile.
  • I am putting the error stack below.
  • Without the timeout selector in any class or lambda shape, with empty or full body, flatSelect compiles well.
Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Guowei Ma
I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag). 


Oytun Tez <[hidden email]>于2019年4月19日 周五下午9:58写道:
Forgot to answer one of your points: the parent class compiles well without this CEP selector (with timeout signature)...


---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <[hidden email]> wrote:
Hey JingsongLee!

Here are some findings...
  • flatSelect without timeout works normally: patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
  • Converted the both timeout and select selectors to an inner class (not static), yielded the same results, doesn't compile.
  • flatSelect without timeout, but with an inner class for PatternFlatSelectFunction, it compiles (same as first bullet).
  • Tried both of these selectors with empty body. Just a skeleton class. Doesn't compile either. Empty body example is in my first email.
  • Tried making both selectors static public inner classes, doesn't compile either.
  • Extracted both timeout and flat selectors to their own independent classes in separate files. Doesn't compile.
  • I am putting the error stack below.
  • Without the timeout selector in any class or lambda shape, with empty or full body, flatSelect compiles well.
Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
--
Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Dawid Wysakowicz-2

Hi Oytun,

I think there is a regression introduced in 1.8 how we handle output tags. The problem is we do not call ClosureCleaner on OutputTag.

There are two options how you can workaround this issue:

1. Declare the OutputTag static

2. Clean the closure explicitly as Guowei suggested: StreamExecutionEnvironment.clean(pendingProjectsTag)

I also opened a jira issue to fix this (FLINK-12297[1])

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12297

On 22/04/2019 03:06, Guowei Ma wrote:
I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag). 


Oytun Tez <[hidden email]>于2019年4月19日 周五下午9:58写道:
Forgot to answer one of your points: the parent class compiles well without this CEP selector (with timeout signature)...


---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <[hidden email]> wrote:
Hey JingsongLee!

Here are some findings...
  • flatSelect without timeout works normally: patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
  • Converted the both timeout and select selectors to an inner class (not static), yielded the same results, doesn't compile.
  • flatSelect without timeout, but with an inner class for PatternFlatSelectFunction, it compiles (same as first bullet).
  • Tried both of these selectors with empty body. Just a skeleton class. Doesn't compile either. Empty body example is in my first email.
  • Tried making both selectors static public inner classes, doesn't compile either.
  • Extracted both timeout and flat selectors to their own independent classes in separate files. Doesn't compile.
  • I am putting the error stack below.
  • Without the timeout selector in any class or lambda shape, with empty or full body, flatSelect compiles well.
Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
--
Best,
Guowei

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Oytun Tez
Thank you Guowei and Dawid! I am trying your suggestions today and will report back.

- I assume the cleaning operation should be done only once because of the upgrade, or should I run every time the application is up?
- `static` sounds a very simple fix to get rid of this. Any drawbacks here?




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Oytun,

I think there is a regression introduced in 1.8 how we handle output tags. The problem is we do not call ClosureCleaner on OutputTag.

There are two options how you can workaround this issue:

1. Declare the OutputTag static

2. Clean the closure explicitly as Guowei suggested: StreamExecutionEnvironment.clean(pendingProjectsTag)

I also opened a jira issue to fix this (FLINK-12297[1])

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12297

On 22/04/2019 03:06, Guowei Ma wrote:
I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag). 


Oytun Tez <[hidden email]>于2019年4月19日 周五下午9:58写道:
Forgot to answer one of your points: the parent class compiles well without this CEP selector (with timeout signature)...


---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <[hidden email]> wrote:
Hey JingsongLee!

Here are some findings...
  • flatSelect without timeout works normally: patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
  • Converted the both timeout and select selectors to an inner class (not static), yielded the same results, doesn't compile.
  • flatSelect without timeout, but with an inner class for PatternFlatSelectFunction, it compiles (same as first bullet).
  • Tried both of these selectors with empty body. Just a skeleton class. Doesn't compile either. Empty body example is in my first email.
  • Tried making both selectors static public inner classes, doesn't compile either.
  • Extracted both timeout and flat selectors to their own independent classes in separate files. Doesn't compile.
  • I am putting the error stack below.
  • Without the timeout selector in any class or lambda shape, with empty or full body, flatSelect compiles well.
Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
--
Best,
Guowei
Reply | Threaded
Open this post in threaded view
|

Re: PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Oytun Tez
Hi all,

Making the tag a static element worked out, thank you!

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Tue, Apr 23, 2019 at 10:37 AM Oytun Tez <[hidden email]> wrote:
Thank you Guowei and Dawid! I am trying your suggestions today and will report back.

- I assume the cleaning operation should be done only once because of the upgrade, or should I run every time the application is up?
- `static` sounds a very simple fix to get rid of this. Any drawbacks here?




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <[hidden email]> wrote:

Hi Oytun,

I think there is a regression introduced in 1.8 how we handle output tags. The problem is we do not call ClosureCleaner on OutputTag.

There are two options how you can workaround this issue:

1. Declare the OutputTag static

2. Clean the closure explicitly as Guowei suggested: StreamExecutionEnvironment.clean(pendingProjectsTag)

I also opened a jira issue to fix this (FLINK-12297[1])

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12297

On 22/04/2019 03:06, Guowei Ma wrote:
I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag). 


Oytun Tez <[hidden email]>于2019年4月19日 周五下午9:58写道:
Forgot to answer one of your points: the parent class compiles well without this CEP selector (with timeout signature)...


---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <[hidden email]> wrote:
Hey JingsongLee!

Here are some findings...
  • flatSelect without timeout works normally: patternStream.flatSelect(PatternFlatSelectFunction), this compiles well.
  • Converted the both timeout and select selectors to an inner class (not static), yielded the same results, doesn't compile.
  • flatSelect without timeout, but with an inner class for PatternFlatSelectFunction, it compiles (same as first bullet).
  • Tried both of these selectors with empty body. Just a skeleton class. Doesn't compile either. Empty body example is in my first email.
  • Tried making both selectors static public inner classes, doesn't compile either.
  • Extracted both timeout and flat selectors to their own independent classes in separate files. Doesn't compile.
  • I am putting the error stack below.
  • Without the timeout selector in any class or lambda shape, with empty or full body, flatSelect compiles well.
Would these findings help? Any ideas?

Here is an error stack:

09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler     - 
org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558)
at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86)
at org.apache.flink.cep.PatternStream.process(PatternStream.java:114)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451)
at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408)
at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89)
at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45)
at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31)
at com.motaword.ipm.kernel.Application.main(Application.java:63)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 9 more






---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.


On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
Hi @Oytun Tez 
It Looks like your PatternFlatSelectFunction is not serializable.
Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable?
Or you may be advised not to use internal classes, but to use a static internal class.

Best, JingsongLee

------------------------------------------------------------------
From:Oytun Tez <[hidden email]>
Send Time:2019年4月19日(星期五) 03:38
To:user <[hidden email]>
Subject:PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)?

protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) {
            OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){};

            return patternStream.flatSelect(
                    pendingProjectsTag,
                    new PatternFlatTimeoutFunction<WorkerEvent, Project>() {
                        @Override
                        public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) {
                        }
                    },
                    new PatternFlatSelectFunction<WorkerEvent, Project>() {
                        @Override
                        public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) {
                        }
                    }
            ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag);
        }

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
--
Best,
Guowei