This post was updated on .
Problem :
I have created a PatternStream with "custom type" and added an event pattern. This works fine in both local and cluster setup. But when I tried to take one of the taskmanager down (on which task was executing), flink tries to restart a job but restart fails with the exception : "Could not restore checkpointed state to operators and functions" because of "ClassNotFoundException". Then I tried to copy the application jar into lib of all the nodes (to avoid any class loader related issues), but in case of restart it still fails with the same exception but with the cause being "java.lang.IllegalArgumentException in PriorityQueue init of AbstractCEPOperator class". MY CEP implementation doesn't use any state API so I don't understand why does it try to restore state. Does it internally use state anywhere? Detailed stacktraces are attached. Setup Details Flink version : 1.1.2 Source : Kakfa Consumer with custom schema Sink : print() Using fixed delay restart Using default (in memory) checkpointing CEP implementation doesn't use state anywhere CEP pattern : Pattern. <CustomClass>begin("add message") .where(new EventFilterFunction(4)) .next("edit issue") .where(new EventFilterFunction(2)) .next("view issue") .where(new EventFilterFunction(1)); stacktraces.stacktraces Please point me towards what can be the problem and please let me know if any other information is needed. |
Updated attachment containing exceptions stacktrace
|
Hello, looks like a bug ... when a PriorityQueue is made with initialCapacity zero (see PriorityQueue.java) an illegal argument exception is thrown The fix would be trivial: when numberPriorityQueueEntries equals zero, create the PriorityQueue with capacity 1 instead of 0. greetings, Frank On Sat, Sep 17, 2016 at 2:56 PM, jaxbihani <[hidden email]> wrote: Updated attachment containing exceptions stacktrace |
Thanks for looking into this Frank! I opened FLINK-4636 [1] to track the issue.2016-09-17 21:15 GMT+02:00 Frank Dekervel <[hidden email]>:
|
@Frank : Thanks. I verified changing that and it works fine. (I still need to copy the user jar to the lib dir which ideally shouldn't be required but I think that is because of userCodeClassLoader() related problem. Will look into that) About implementation I feel instead of initializing priority queue with 1 when numberPriorityQueueEntries=0. We should have a condition check that if numberPriorityQueueEntries > 1 then only perform the next processing (creating priority queue object and deserialization calls in a loop) as numberPriorityQueueEntries=0 means nothing needs to be done in restoreState. @Fabian: I can add a patch for it but as Frank found the resolution it is his call to decide. |
Fabian
Had a discussion with Frank. As he has no means to reproduce/test the bug I will submit a patch for this. |
Great! If you send me your JIRA user I can add you as a contributor and assign the issue to you. Best, Fabian Von: [hidden email] Fabian Had a discussion with Frank. As he has no means to reproduce/test the bug I will submit a patch for this. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-CEPPatternOperator-when-taskmanager-is-killed-tp9024p9097.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Hi Fabian
My JIRA user is: jaxbihani I have created a pull request for the fix : https://github.com/apache/flink/pull/2568 |
Great, thanks! I gave you contributor permissions in JIRA. You can now also assign issues to yourself if you decide to continue to contribute.2016-09-29 16:48 GMT+02:00 jaxbihani <[hidden email]>: Hi Fabian |
Free forum by Nabble | Edit this page |