Dealing with an asynchronous source (and sink) in Flink 1.5.0. Await.Result() does not complete.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Dealing with an asynchronous source (and sink) in Flink 1.5.0. Await.Result() does not complete.

Niels van Kaam
Hi All,

I have implemented a custom sourcefuntion on a datasource with an asynchronous API (the API calls return Scala futures). I need to perform calls to the asynchronous API during initialization of each individual (parallel) source instance, and when in exacly-once mode also during snapshotstate or inside the run loop. The polling loop itself is synchronous.

Since I am (at least for now) not worried about performance, I just used Await.result() to perform a blocking wait on each asynchronous call (https://docs.scala-lang.org/overviews/core/futures.html#blocking-outside-the-future). This worked fine so far in Flink 1.4.2, but when I upgrade to Flink 1.5 the futures never complete (Eventually causing timeout exceptions on the await.result call). 

The issue occurs in integration tests where the Flink jobs run locally (in a minicluster). The issue does not occur on my local machines, but does so consistently on Travis. I therefore suspect the issue is related to the number of cores/workers that are available. Await.result however uses the blockingContext, which is backed by the forkjoinpool. I do not expect a few asynchronous calls to run into any limitations there. Compiling and running the same code with Flink 1.4.2 works fine. The issue occurs both when performing Await.Result() inside the run loop or inside initializeState().

Am I breaking the process model when using Await.result on asynchronous api calls within initializeState or snapshotState in a sourcefunction (or Sink for that matter)? With Await.result I do make sure the calls are created and awaited within a single checkpoint.

Any other suggestions where to look for the problem, or explanation why this issue could occur when upgrading from 1.4.2 to 1.5.0?


Thank you for your help!

Cheers,
Niels