Hey folks,
I've got a Beam/Python pipeline that works on the DirectRunner and now am trying to run this on a local dev Flink cluster. Running this yields an error out the gate around not being able to deserialize UnboundedSource (my PubSub source). I'm not sure how to debug this and would love to get some feedback on how to solve this issue. Beam SDK: 2.19 Flink: 1.9.3 Python: 3.7 Beam args: ['--runner=FlinkRunner', '--flink_version=1.9', '--flink_submit_uber_jar', '--streaming'] (Stacktrace below) -Pradip [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - ArtifactStagingService started on localhost:55371 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - Java ExpansionService started on localhost:55372 [main] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver - JobService started on localhost:55364 [grpc-default-executor-0] INFO org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f with pipeline runner org.apache.beam.runners.flink.FlinkPipelineRunner@292a28a1 [grpc-default-executor-0] INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to Flink program. [flink-runner-job-invoker] INFO org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming Environment. [flink-runner-job-invoker] ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error during job invocation BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f. java.lang.IllegalArgumentException: unable to deserialize UnboundedSource at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) at org.apache.beam.runners.core.construction.ReadTranslation.unboundedSourceFromProto(ReadTranslation.java:126) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedSource(FlinkStreamingPortablePipelineTranslator.java:507) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translateUnboundedRead(FlinkStreamingPortablePipelineTranslator.java:472) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:250) at org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator.translate(FlinkStreamingPortablePipelineTranslator.java:120) at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:113) at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:84) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:84) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5) at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474) at org.xerial.snappy.Snappy.uncompress(Snappy.java:513) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147) at org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99) at org.xerial.snappy.SnappyInputStream.<init>(SnappyInputStream.java:59) at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68) ... 14 more ERROR:root:java.io.IOException: FAILED_TO_UNCOMPRESS(5) [flink-runner-job-invoker] WARN org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService - Failed to remove job staging directory for token {"sessionId":"job_90d46a1e-0f9e-4d06-add5-7312c94043da","basePath":"/var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr"}: {} java.io.FileNotFoundException: /var/folders/vj/d1wqfcyn015chj650nw3m_1r0000gn/T/beam-temp6b11batn/artifacts5mt12bhr/job_90d46a1e-0f9e-4d06-add5-7312c94043da/MANIFEST (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:118) at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:82) at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:252) at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService.loadManifest(BeamFileSystemArtifactRetrievalService.java:88) at org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService.removeArtifacts(BeamFileSystemArtifactStagingService.java:92) at org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.lambda$createJobService$0(JobServerDriver.java:63) at org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService.lambda$run$0(InMemoryJobService.java:201) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.setState(JobInvocation.java:247) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.access$200(JobInvocation.java:48) at org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation$1.onFailure(JobInvocation.java:151) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1052) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Traceback (most recent call last): File "bin/run-pipeline.py", line 70, in <module> main() File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 829, in __call__ return self.main(*args, **kwargs) File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 782, in main rv = self.invoke(ctx) File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 1066, in invoke return ctx.invoke(self.callback, **ctx.params) File "<redacted venv location>/lib/python3.7/site-packages/click/core.py", line 610, in invoke return callback(*args, **kwargs) File "bin/run-pipeline.py", line 64, in main job = runner.run(pipeline=pipeline) File "/Users/crossbow/git/brogrammers-tech/grp_data-pipelines/ohlc-candles/lib/data-pipeline/data_pipeline/beam_pipeline/runners.py", line 42, in run result = dag.run() File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/pipeline.py", line 474, in run return self.runner.run_pipeline(self, self._options) File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/flink_runner.py", line 47, in run_pipeline return super(FlinkRunner, self).run_pipeline(pipeline, options) File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 334, in run_pipeline result.wait_until_finish() File "<redacted venv location>/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py", line 455, in wait_until_finish self._job_id, self._state, self._last_error_message())) RuntimeError: Pipeline BeamApp-crossbow-0607202942-6ab5b807_f59999df-b95e-4b60-ae87-d069f19d029f failed in state FAILED: java.io.IOException: FAILED_TO_UNCOMPRESS(5) |
I've attached below some minimal sample code that reproduces this issue below. This works perfectly with the DirectRunner. -Pradip #!/usr/bin/env python3 import apache_beam as beam import logging import os class DummyPipeline(beam.PTransform): def expand(self, p): ( p | "Read from PS" >> beam.io.gcp.pubsub.ReadFromPubSub( topic="<valid topic>") | beam.Map(print) ) return p def main(): beam_options = [ # "--runner=DirectRunner", "--runner=FlinkRunner", "--flink_version=1.9", "--flink_submit_uber_jar", "--streaming", '--save_main_session', ] popts = beam.options.pipeline_options.PipelineOptions(flags=beam_options) print(popts) p = beam.Pipeline(options=popts) ( p | "Do It" >> DummyPipeline() ) job = p.run() job.wait_until_finish() if __name__ == "__main__": main() Le dim. 7 juin 2020 à 14:57, Pradip Thachile <[hidden email]> a écrit :
|
Another data point folks: I've been able to run my simple test case with Dataflow without issue, but Flink is still problematic. Le dim. 7 juin 2020 à 18:48, Pradip Thachile <[hidden email]> a écrit :
|
Free forum by Nabble | Edit this page |