Hi,
I faced an issue on Flink 1.11. It was for now one time thing and I cannot reproduce it. However I think something is lurking there... I cannot post full stack trace and user code however I will try to describe the problem. Setup without any resource groups with only one Operator chain restriction mentioned below. chained task #1 - AsyncOperator with orderedWait calling 3rd party system forwards to chained task #2 - with: a) ProcessFunction A calling multi threaded library. in Process Function we do CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message)) b) Process Function B (no multi thread operations) c) AsyncOperator with ordered wait calling 3rd party system d) process Function Between task #1 and #2 there is a .startNewChain() so separate those two tasks. During load tests we got: Caused by: java.lang.IllegalStateException: Illegal thread detected. This method must be called from inside the mailbox thread! The question is, what it actually means and when it may happen? The "full" stack trace, from where I had to remove user code: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[flink-dist_2.12-1.11.0.jar:1.11.0] user---Code---calls at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[flink-dist_2.12-1.11.0.jar:1.11.0] user---Code---calls at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) ~[flink-dist_2.12-1.11.0.jar:1.11.0] user---Code---calls at java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) [?:?] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.IllegalStateException: Illegal thread detected. This method must be called from inside the mailbox thread! at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:258) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:135) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:78) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addToWorkQueue(AsyncWaitOperator.java:258) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:180) ~[flink-dist_2.12-1.11.0.jar:1.11.0] at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) ~[flink-dist_2.12-1.11.0.jar:1.11.0] ... 35 more -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, sorry for the confusing error message. In short, mailbox thread = task thread. your operator a) calls collector.collect from a different thread (in which the CompleteableFuture is completed). However, all APIs must always be used from the task thread. The only way to cross thread boundaries is AsyncIO, so a) must also be an asyncIO. On Tue, Nov 24, 2020 at 5:53 PM KristoffSC <[hidden email]> wrote: Hi, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Arvid,
Thank you for your answer. And what if a) would block task's thread? Let's say I'm ok with making entire task thread to wait on this third party lib. In that case I would be safe from having this exception even though I would not use AsyncIO? -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi KristoffSC, I'd strongly suggest not blocking the task thread if it involves external services. RPC notification cannot be processed and checkpoints are delayed when the task thread is blocked. That's what AsyncIO is for. If your third party library just takes a few ms to finish computation without any external service, that would be totally fine though and would be the normal case of compute-intensive tasks where few events come in and still saturate a cluster quickly (e.g., text mining, data cleansing, or even evaluation of a complex model). On Tue, Nov 24, 2020 at 10:39 PM KristoffSC <[hidden email]> wrote: Hi Arvid, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |