Hi All,
I have implemented a custom sourcefuntion on a datasource with an asynchronous API (the API calls return Scala futures). I need to perform calls to the asynchronous API during initialization of each individual (parallel) source instance, and when in exacly-once mode also during snapshotstate or inside the run loop. The polling loop itself is synchronous.
The issue occurs in integration tests where the Flink jobs run locally (in a minicluster). The issue does not occur on my local machines, but does so consistently on Travis. I therefore suspect the issue is related to the number of cores/workers that are available. Await.result however uses the blockingContext, which is backed by the forkjoinpool. I do not expect a few asynchronous calls to run into any limitations there. Compiling and running the same code with Flink 1.4.2 works fine. The issue occurs both when performing Await.Result() inside the run loop or inside initializeState().
Am I breaking the process model when using Await.result on asynchronous api calls within initializeState or snapshotState in a sourcefunction (or Sink for that matter)? With Await.result I do make sure the calls are created and awaited within a single checkpoint.
Any other suggestions where to look for the problem, or explanation why this issue could occur when upgrading from 1.4.2 to 1.5.0?
Thank you for your help!
Cheers,
Niels