Hello, I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One of my jobs involves reading from Google Cloud Storage which uses the file scheme "gs://". Everything was fine but once in a while I would get an exception that the scheme is not recognised. Now I've started seeing them more often. It seems to be arbitrary - the exact same job with the exact same parameters may finish successfully or throw this exception and fail immediately. I can't figure out why it's not deterministic. Here is the full exception logged upon the job failing: java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Match files from GCS/Via MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459) at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529) at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49) at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30) at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) Any ideas why the behaviour is not deterministic regarding recognising file system schemes? Thanks, Encho |
Hi Encho,
the SpillingAdaptiveSpanningRecordDeserializer that you see in your stack trace is executed while reading input records from another task. If the (serialized) records are too large (> 5MiB), it will write and assemble them in a spilling channel, i.e. on disk, instead of using memory. This will use the temporary directories specified via "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to System.getProperty("java.io.tmpdir"). -> These paths must actually be on an ordinary file system, not in gs:// or so. The reason you only see this sporadically may be because not all your records are that big. It should, however, be deterministic in that it should always occur for the same record. Maybe something is wrong here and the record length is messed up, e.g. due to a bug in the de/serializer or the network stack. Do you actually have a minimal working example that you can share (either privately with me, or here) and shows this error? Nico On 29/08/18 14:19, Encho Mishinev wrote: > Hello, > > I am using Flink 1.5.3 and executing jobs through Apache Beam 2.6.0. One > of my jobs involves reading from Google Cloud Storage which uses the > file scheme "gs://". Everything was fine but once in a while I would get > an exception that the scheme is not recognised. Now I've started seeing > them more often. It seems to be arbitrary - the exact same job with the > exact same parameters may finish successfully or throw this exception > and fail immediately. I can't figure out why it's not deterministic. > Here is the full exception logged upon the job failing: > > java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at Match files from GCS/Via MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) > at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) > at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: No filesystem found for scheme gs > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) > Caused by: java.lang.IllegalArgumentException: No filesystem found for scheme gs > at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:459) > at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:529) > at org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) > at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49) > at org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30) > at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) > at org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) > at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) > at org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90) > at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103) > at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) > at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105) > at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066) > at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) > > > Any ideas why the behaviour is not deterministic regarding recognising file system schemes? > > > Thanks, > > Encho > Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen signature.asc (201 bytes) Download Attachment |
Hi Nico, Unfortunately I can't share any of data, but it is not even data being processed at the point of failure - it is still in the matching-files-from-GCS phase. I am using Apache Beam's FileIO to match files and during one of those match-files steps I get the failure above. Currently I run the job and when a taskmanager shows this error I reset it and restart the job. That works fine since the failure occurs at the beginning of the job only. It seems to be a problem within some taskmanagers, which is very odd considering that I have them all generated by a Kubernetes deployment, i.e. they should be completely identical. Sometimes I have to restart 3-4 of them until I have a running cluster. I will try setting the temporary directory to something other than the default, can't hurt. Thanks for the help, Encho On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <[hidden email]> wrote: Hi Encho, |
Sorry, I had a second look and your stacktrace does not even point to
the spilling channel - it reads from the memory segment directly. -> setting the temp dirs will thus not make a difference I'm wondering why your deserializer eventually reads from a file on gs:// directly, instead of, for example, a follow-up map operation. Nico On 13/09/18 14:52, Encho Mishinev wrote: > Hi Nico, > > Unfortunately I can't share any of data, but it is not even data being > processed at the point of failure - it is still in the > matching-files-from-GCS phase. > > I am using Apache Beam's FileIO to match files and during one of those > match-files steps I get the failure above. > > Currently I run the job and when a taskmanager shows this error I reset > it and restart the job. That works fine since the failure occurs at the > beginning of the job only. It seems to be a problem within some > taskmanagers, which is very odd considering that I have them all > generated by a Kubernetes deployment, i.e. they should be completely > identical. Sometimes I have to restart 3-4 of them until I have a > running cluster. > > I will try setting the temporary directory to something other than the > default, can't hurt. > > Thanks for the help, > Encho > > On Thu, Sep 13, 2018 at 3:41 PM Nico Kruber <[hidden email] > <mailto:[hidden email]>> wrote: > > Hi Encho, > the SpillingAdaptiveSpanningRecordDeserializer that you see in your > stack trace is executed while reading input records from another task. > If the (serialized) records are too large (> 5MiB), it will write and > assemble them in a spilling channel, i.e. on disk, instead of using > memory. This will use the temporary directories specified via > "io.tmp.dirs" (or "taskmanager.tmp.dirs") which defaults to > System.getProperty("java.io <http://java.io>.tmpdir"). > -> These paths must actually be on an ordinary file system, not in gs:// > or so. > > The reason you only see this sporadically may be because not all your > records are that big. It should, however, be deterministic in that it > should always occur for the same record. Maybe something is wrong here > and the record length is messed up, e.g. due to a bug in the > de/serializer or the network stack. > > Do you actually have a minimal working example that you can share > (either privately with me, or here) and shows this error? > > > Nico > > On 29/08/18 14:19, Encho Mishinev wrote: > > Hello, > > > > I am using Flink 1.5.3 and executing jobs through Apache Beam > 2.6.0. One > > of my jobs involves reading from Google Cloud Storage which uses the > > file scheme "gs://". Everything was fine but once in a while I > would get > > an exception that the scheme is not recognised. Now I've started > seeing > > them more often. It seems to be arbitrary - the exact same job > with the > > exact same parameters may finish successfully or throw this exception > > and fail immediately. I can't figure out why it's not deterministic. > > Here is the full exception logged upon the job failing: > > > > java.lang.Exception: The data preparation for task 'GroupReduce > (GroupReduce at Match files from GCS/Via > MatchAll/Reshuffle.ViaRandomKey/Reshuffle/GroupByKey)' , caused an > error: Error obtaining the sorted input: Thread 'SortMerger Reading > Thread' terminated due to an exception: No filesystem found for > scheme gs > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:479) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: Thread 'SortMerger Reading Thread' terminated due to an > exception: No filesystem found for scheme gs > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:650) > > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1108) > > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:473) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: No filesystem found for scheme gs > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:831) > > Caused by: java.lang.IllegalArgumentException: No filesystem found > for scheme gs > > at org.apache.beam.sdk.io > <http://org.apache.beam.sdk.io>.FileSystems.getFileSystemInternal(FileSystems.java:459) > > at org.apache.beam.sdk.io > <http://org.apache.beam.sdk.io>.FileSystems.matchNewResource(FileSystems.java:529) > > at > org.apache.beam.sdk.io.fs.ResourceIdCoder.decode(ResourceIdCoder.java:49) > > at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:49) > > at > org.apache.beam.sdk.io.fs.MetadataCoder.decode(MetadataCoder.java:30) > > at > org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:116) > > at > org.apache.beam.sdk.values.TimestampedValue$TimestampedValueCoder.decode(TimestampedValue.java:88) > > at > org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:124) > > at > org.apache.beam.sdk.coders.IterableLikeCoder.decode(IterableLikeCoder.java:60) > > at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159) > > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82) > > at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480) > > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:90) > > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:103) > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) > > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > at org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:105) > > at org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) > > at org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io>.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47) > > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1066) > > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:827) > > > > > > Any ideas why the behaviour is not deterministic regarding > recognising file system schemes? > > > > > > Thanks, > > > > Encho > > > > -- > Nico Kruber | Software Engineer > data Artisans > > Follow us @dataArtisans > -- > Join Flink Forward - The Apache Flink Conference > Stream Processing | Event Driven | Real Time > -- > Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany > data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > Nico Kruber | Software Engineer data Artisans Follow us @dataArtisans -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Stresemannstr. 121A,10963 Berlin, Germany data Artisans, Inc. | 1161 Mission Street, San Francisco, CA-94103, USA -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |