Hi all, We are just migration from 1.6 to 1.8. I encountered a serialization error which we didn't have before if memory serves: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields. The method below simply intakes a PatternStream from CEP.pattern() and makes use of the sideoutput for timed-out events. Can you see anything weird here (WorkerEvent is the input, but collectors collect Project object)? protected DataStream<Project> getPending(PatternStream<WorkerEvent> patternStream) { OutputTag<Project> pendingProjectsTag = new OutputTag<Project>("invitation-pending-projects"){}; return patternStream.flatSelect( pendingProjectsTag, new PatternFlatTimeoutFunction<WorkerEvent, Project>() { @Override public void timeout(Map<String, List<WorkerEvent>> map, long l, Collector<Project> collector) { } }, new PatternFlatSelectFunction<WorkerEvent, Project>() { @Override public void flatSelect(Map<String, List<WorkerEvent>> pattern, Collector<Project> collector) { } } ).name("Select pending projects for invitation").getSideOutput(pendingProjectsTag); } --- Oytun Tez M O T A W O R D The World's Fastest Human Translation Platform. |
Hi
@Oytun Tez It Looks like your PatternFlatSelectFunction is not serializable. Because you use anonymous inner class, Check the class to which getPending belongs, maybe that class is not serializable? Or you may be advised not to use internal classes, but to use a static internal class. Best, JingsongLee
|
Hey JingsongLee! Here are some findings...
Would these findings help? Any ideas? Here is an error stack: 09:36:51,925 ERROR com.motaword.ipm.kernel.error.controller.ExceptionHandler - org.apache.flink.api.common.InvalidProgramException: The implementation of the PatternFlatSelectAdapter is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1558) at org.apache.flink.cep.PatternStreamBuilder.clean(PatternStreamBuilder.java:86) at org.apache.flink.cep.PatternStream.process(PatternStream.java:114) at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:451) at org.apache.flink.cep.PatternStream.flatSelect(PatternStream.java:408) at com.motaword.ipm.business.invitation.controller.PendingProjects.getPending(PendingProjects.java:89) at com.motaword.ipm.business.invitation.controller.PendingProjects.run(PendingProjects.java:45) at com.motaword.ipm.business.invitation.boundary.InvitationJob.run(InvitationJob.java:31) at com.motaword.ipm.kernel.Application.main(Application.java:63) Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:576) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 9 more --- Oytun Tez M O T A W O R D The World's Fastest Human Translation Platform. On Fri, Apr 19, 2019 at 3:14 AM JingsongLee <[hidden email]> wrote:
|
Forgot to answer one of your points: the parent class compiles well without this CEP selector (with timeout signature)... --- Oytun Tez M O T A W O R D The World's Fastest Human Translation Platform. On Fri, Apr 19, 2019 at 9:40 AM Oytun Tez <[hidden email]> wrote:
|
I think you could try StreamExecutionEnvironment.clean(pendingProjectsTag). Oytun Tez <[hidden email]>于2019年4月19日 周五下午9:58写道:
Best, Guowei |
Hi Oytun, I think there is a regression introduced in 1.8 how we handle output tags. The problem is we do not call ClosureCleaner on OutputTag. There are two options how you can workaround this issue: 1. Declare the OutputTag static 2. Clean the closure explicitly as Guowei suggested: StreamExecutionEnvironment.clean(pendingProjectsTag) I also opened a jira issue to fix this (FLINK-12297[1]) Best, Dawid [1] https://issues.apache.org/jira/browse/FLINK-12297 On 22/04/2019 03:06, Guowei Ma wrote:
signature.asc (849 bytes) Download Attachment |
Thank you Guowei and Dawid! I am trying your suggestions today and will report back. - I assume the cleaning operation should be done only once because of the upgrade, or should I run every time the application is up? - `static` sounds a very simple fix to get rid of this. Any drawbacks here? --- Oytun Tez M O T A W O R D The World's Fastest Human Translation Platform. On Tue, Apr 23, 2019 at 2:56 AM Dawid Wysakowicz <[hidden email]> wrote:
|
Hi all, Making the tag a static element worked out, thank you! --- Oytun Tez M O T A W O R D The World's Fastest Human Translation Platform. On Tue, Apr 23, 2019 at 10:37 AM Oytun Tez <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |