Hello ,I have to create a custom Parquet writer with rolling sink , I'm seeing error like this , I'm expecting every partition should write in a new file ?? Any tips ?
---------------
18:12:12.551 [flink-akka.actor.default-dispatcher-5] DEBUG akka.event.EventStream - shutting down: StandardOutLogger started
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$ anonfun$handleMessage$1$$ anonfun$applyOrElse$8.apply$ mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$ anonfun$handleMessage$1$$ anonfun$applyOrElse$8.apply( JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$ anonfun$handleMessage$1$$ anonfun$applyOrElse$8.apply( JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable. liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run( Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala: 41)
at akka.dispatch.ForkJoinExecutorConfigurator$ AkkaForkJoinTask.exec( AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec( ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. pollAndExecAll(ForkJoinPool. java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java: 1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker( ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ CopyingChainingOutput.collect( OperatorChain.java:376)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ CopyingChainingOutput.collect( OperatorChain.java:358)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ BroadcastingOutputCollector. collect(OperatorChain.java: 399)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$ BroadcastingOutputCollector. collect(OperatorChain.java: 381)
at org.apache.flink.streaming.api.operators. AbstractStreamOperator$ CountingOutput.collect( AbstractStreamOperator.java: 346)
at org.apache.flink.streaming.api.operators. AbstractStreamOperator$ CountingOutput.collect( AbstractStreamOperator.java: 329)
at org.apache.flink.streaming.api.operators.StreamSource$ NonTimestampContext.collect( StreamSource.java:161)
at org.apache.flink.streaming.connectors.kafka.internals. AbstractFetcher.emitRecord( AbstractFetcher.java:225)
at org.apache.flink.streaming.connectors.kafka.internals. SimpleConsumerThread.run( SimpleConsumerThread.java:379)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache. hadoop.hdfs.protocol. AlreadyBeingCreatedException): Failed to CREATE_FILE /y=2017/m=01/d=10/H=18/M=12/_ part-0-0.in-progress for DFSClient_NONMAPREDUCE_ 1062142735_3
Free forum by Nabble | Edit this page |