Exception: This method must be called from inside the mailbox thread

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception: This method must be called from inside the mailbox thread

KristoffSC
Hi,
I faced an issue on Flink 1.11. It was for now one time thing and I cannot
reproduce it. However I think something is lurking there...

I cannot post full stack trace and user code however I will try to describe
the problem.

Setup without any resource groups with only one Operator chain restriction
mentioned below.

chained task #1 - AsyncOperator with orderedWait calling 3rd party system
forwards to
chained task #2 - with:
a) ProcessFunction A calling multi threaded library. in Process Function we
do
CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message))
b) Process Function B (no multi thread operations)
c) AsyncOperator with ordered wait calling 3rd party system
d) process Function

Between task #1 and #2 there is a .startNewChain() so separate those two
tasks.

During load tests we got:
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!

The question is, what it actually means and when it may happen?

The "full" stack trace, from where I had to remove user code:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        user---Code---calls
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        user---Code---calls
        at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        user---Code---calls
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source)
[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
[?:?]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!
        at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:258)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:135)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:78)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addToWorkQueue(AsyncWaitOperator.java:258)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:180)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        ... 35 more








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Exception: This method must be called from inside the mailbox thread

Arvid Heise-3
Hi KristoffSC,

sorry for the confusing error message. In short, mailbox thread = task thread.

your operator a) calls collector.collect from a different thread (in which the CompleteableFuture is completed). However, all APIs must always be used from the task thread.

The only way to cross thread boundaries is AsyncIO, so a) must also be an asyncIO.

On Tue, Nov 24, 2020 at 5:53 PM KristoffSC <[hidden email]> wrote:
Hi,
I faced an issue on Flink 1.11. It was for now one time thing and I cannot
reproduce it. However I think something is lurking there...

I cannot post full stack trace and user code however I will try to describe
the problem.

Setup without any resource groups with only one Operator chain restriction
mentioned below.

chained task #1 - AsyncOperator with orderedWait calling 3rd party system
forwards to
chained task #2 - with:
a) ProcessFunction A calling multi threaded library. in Process Function we
do
CompletableFuture.allOf(..userCode..).thenAccept(collector.collect(message))
b) Process Function B (no multi thread operations)
c) AsyncOperator with ordered wait calling 3rd party system
d) process Function

Between task #1 and #2 there is a .startNewChain() so separate those two
tasks.

During load tests we got:
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!

The question is, what it actually means and when it may happen?

The "full" stack trace, from where I had to remove user code:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:734)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        user---Code---calls
        at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:787)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:740)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        user---Code---calls
        at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        user---Code---calls
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(Unknown Source)
[?:?]
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
[?:?]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: Illegal thread detected. This
method must be called from inside the mailbox thread!
        at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.checkIsMailboxThread(TaskMailboxImpl.java:258)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:135)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.yield(MailboxExecutorImpl.java:78)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.addToWorkQueue(AsyncWaitOperator.java:258)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.processElement(AsyncWaitOperator.java:180)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
~[flink-dist_2.12-1.11.0.jar:1.11.0]
        ... 35 more








--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Exception: This method must be called from inside the mailbox thread

KristoffSC
Hi  Arvid,
Thank you for your answer.

And what if a) would block task's thread?
Let's say I'm ok with making entire task thread to wait on this third party
lib.

In that case I would be safe from having this exception even though I would
not use AsyncIO?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Exception: This method must be called from inside the mailbox thread

Arvid Heise-3
Hi KristoffSC,

I'd strongly suggest not blocking the task thread if it involves external services. RPC notification cannot be processed and checkpoints are delayed when the task thread is blocked. That's what AsyncIO is for.

If your third party library just takes a few ms to finish computation without any external service, that would be totally fine though and would be the normal case of compute-intensive tasks where few events come in and still saturate a cluster quickly (e.g., text mining, data cleansing, or even evaluation of a complex model).

On Tue, Nov 24, 2020 at 10:39 PM KristoffSC <[hidden email]> wrote:
Hi  Arvid,
Thank you for your answer.

And what if a) would block task's thread?
Let's say I'm ok with making entire task thread to wait on this third party
lib.

In that case I would be safe from having this exception even though I would
not use AsyncIO?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng