Hi, I have a very simple program using the local execution environment, that throws NPE and other exceptions related to concurrent access when launching a count for a DataSet from different threads. The program is https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is basically this: def doubleCollectConcurrent = { It looks like the issue is on OperatorTranslation.java at https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51, when a sink is added to the sinks list while that list is being traversed. I have the impression that this is by design, so I'd like to confirm that this is the expected behaviour, and whether this is happening only for the local execution environment, or if this affects all execution environments implementations. Other related questions I have are:
Thanks a lot for your help. Greetings, Juan |
Any thoughts on this? On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <[hidden email]> wrote:
|
Hi Juan,
as far as I know we do not provide any
concurrency guarantees for count() or collect(). Those methods
need to be used with caution anyways as the result size must not
exceed a certain threshold. I will loop in Fabian who might know
more about the internals of the execution?
Regards,
Timo
Am 26.04.19 um 03:13 schrieb Juan
Rodríguez Hortalá:
|
Hi Timo, Thanks for your answer. I was surprised to have problems calling those methods concurrently, because I though data sets were immutable. Now I understand calling count or collect mutates the data set, not its contents but some kind of execution plan included in the data set. I suggest adding a remark about this lack of thread safety to the documentation. Maybe it’s already there but I haven’t seen it. I also understand repeated calls to collect and count the safe data set are ok as long as they are done sequentially, and not concurrently. Thanks, Juan On Fri, Apr 26, 2019 at 02:00 Timo Walther <[hidden email]> wrote:
|
Hi Juan, count() and collect() trigger the execution of a job. Since Flink does not cache intermediate results (yet), all operations from the sink (count()/collect()) to the sources are executed. So in a sense a DataSet is immutable (given that the input of the sources do not change) but completely recomputed for every execution. There are currently some efforts [1] on the way to improve Flink behavior for interactive sessions. Best, Fabian Am Fr., 26. Apr. 2019 um 17:03 Uhr schrieb Juan Rodríguez Hortalá <[hidden email]>:
|
Thanks for your answer Fabian. In my opinion this is not just a possible new feature for an optimization, but a bigger problem because the client program crashes with an exception when concurrent counts or collects are triggered on the same data set, and this also happens non deterministically depending on how threads are executed. So that should be documented somewhere. Just my two cents Thanks, Juan On Mon, Apr 29, 2019 at 02:02 Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |