StreamingFileSink with ParquetAvroWriters

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

StreamingFileSink with ParquetAvroWriters

Jan Oelschlegel

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>
org.apache.flink</groupId>
   <artifactId>
flink-parquet_${scala.binary.version}</artifactId>
   <version>
${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
   
new Path("hdfs://namenode.local:8020/user/datastream/"),
   
ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink with ParquetAvroWriters

Yun Gao
Hi Jan,

    Could you have a try by adding this dependency ?
   
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
   <version>1.11.1</version>
</dependency> 



Best,
 Yun

------------------Original Mail ------------------
Sender:Jan Oelschlegel <[hidden email]>
Send Date:Thu Jan 14 00:49:30 2021
Subject:StreamingFileSink with ParquetAvroWriters

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>
org.apache.flink</groupId>
   <artifactId>
flink-parquet_${scala.binary.version}</artifactId>
   <version>
${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
   
new Path("hdfs://namenode.local:8020/user/datastream/"),
   
ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

Re: StreamingFileSink with ParquetAvroWriters

Dawid Wysakowicz-2
In reply to this post by Jan Oelschlegel

Hi Jan

Could you make sure you are packaging that dependency with your job jar? There are instructions how to configure your build setup[1]. Especially the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>
org.apache.flink</groupId>
   <artifactId>
flink-parquet_${scala.binary.version}</artifactId>
   <version>
${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://namenode.local:8020/user/datastream/"),
    ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

AW: StreamingFileSink with ParquetAvroWriters

Jan Oelschlegel

Hi Dawid,

 

i used the official maven archetype for a Flink project based on scala from here[1]

 

mvn archetype:generate                               \

  -DarchetypeGroupId=org.apache.flink              \

  -DarchetypeArtifactId=flink-quickstart-scala      \

  -DarchetypeVersion=1.11.2

 

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project

 

Best,

Jan

 

 

Von: Dawid Wysakowicz <[hidden email]>
Gesendet: Donnerstag, 14. Januar 2021 12:42
An: Jan Oelschlegel <[hidden email]>; [hidden email]
Betreff: Re: StreamingFileSink with ParquetAvroWriters

 

Hi Jan

Could you make sure you are packaging that dependency with your job jar? There are instructions how to configure your build setup[1]. Especially the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-parquet_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://namenode.local:8020/user/datastream/"),
    ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

Re: AW: StreamingFileSink with ParquetAvroWriters

Dawid Wysakowicz-2

Have you checked if the class (org/apache/parquet/avro/AvroParquetWriter) is in the jar that you are submitting.

On 15/01/2021 12:05, Jan Oelschlegel wrote:

Hi Dawid,

 

i used the official maven archetype for a Flink project based on scala from here[1]

 

mvn archetype:generate                               \

  -DarchetypeGroupId=org.apache.flink              \

  -DarchetypeArtifactId=flink-quickstart-scala      \

  -DarchetypeVersion=1.11.2

 

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project

 

Best,

Jan

 

 

Von: Dawid Wysakowicz [hidden email]
Gesendet: Donnerstag, 14. Januar 2021 12:42
An: Jan Oelschlegel [hidden email]; [hidden email]
Betreff: Re: StreamingFileSink with ParquetAvroWriters

 

Hi Jan

Could you make sure you are packaging that dependency with your job jar? There are instructions how to configure your build setup[1]. Especially the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-parquet_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://namenode.local:8020/user/datastream/"),
    ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

AW: StreamingFileSink with ParquetAvroWriters

Jan Oelschlegel
In reply to this post by Yun Gao

Hi Yun,

 

thanks for your answer. I’m not very familiar with parquet and I expected it to be as easy as by using the Table-API of Flink. But obviously not 😊

 

Now the error is gone, but why do you choose this specific version? Can it be a newer one or older one?

 

And now I got following error:

 

org.apache.parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: required group meldedatum {

}

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:27)

    at org.apache.parquet.schema.GroupType.accept(GroupType.java:255)

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:31)

    at org.apache.parquet.schema.TypeUtil$1.visit(TypeUtil.java:37)

    at org.apache.parquet.schema.MessageType.accept(MessageType.java:55)

    at org.apache.parquet.schema.TypeUtil.checkValidWriteSchema(TypeUtil.java:23)

    at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:233)

    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)

    at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:530)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:87)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Do I have to write a Avro schema? I thought I can use normal POJOs from my datastream to specify the parquet format.

 

 

Best,

Jan

 

Von: Yun Gao <[hidden email]>
Gesendet: Donnerstag, 14. Januar 2021 08:34
An: Jan Oelschlegel <[hidden email]>; [hidden email]
Betreff: Re: StreamingFileSink with ParquetAvroWriters

 

Hi Jan,

 

    Could you have a try by adding this dependency ?

   

<dependency>
   <
groupId>org.apache.parquet</groupId>
   <
artifactId>parquet-avro</artifactId>
   <version>1.11.1</version>
</dependency> 
 
 

 

Best,

 Yun

 

------------------Original Mail ------------------

Sender:Jan Oelschlegel <[hidden email]>

Send Date:Thu Jan 14 00:49:30 2021

Subject:StreamingFileSink with ParquetAvroWriters

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>
org.apache.flink</groupId>
   <artifactId>
flink-parquet_${scala.binary.version}</artifactId>
   <version>
${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
   
new Path("hdfs://namenode.local:8020/user/datastream/"),
   
ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

 

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

AW: AW: StreamingFileSink with ParquetAvroWriters

Jan Oelschlegel
In reply to this post by Dawid Wysakowicz-2

Yes, after unzipping it is in the jar:

 

 

Von: Dawid Wysakowicz <[hidden email]>
Gesendet: Freitag, 15. Januar 2021 12:10
An: Jan Oelschlegel <[hidden email]>; [hidden email]
Betreff: Re: AW: StreamingFileSink with ParquetAvroWriters

 

Have you checked if the class (org/apache/parquet/avro/AvroParquetWriter) is in the jar that you are submitting.

On 15/01/2021 12:05, Jan Oelschlegel wrote:

Hi Dawid,

 

i used the official maven archetype for a Flink project based on scala from here[1]

 

mvn archetype:generate                               \

  -DarchetypeGroupId=org.apache.flink              \

  -DarchetypeArtifactId=flink-quickstart-scala      \

  -DarchetypeVersion=1.11.2

 

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project

 

Best,

Jan

 

 

Von: Dawid Wysakowicz [hidden email]
Gesendet: Donnerstag, 14. Januar 2021 12:42
An: Jan Oelschlegel [hidden email]; [hidden email]
Betreff: Re: StreamingFileSink with ParquetAvroWriters

 

Hi Jan

Could you make sure you are packaging that dependency with your job jar? There are instructions how to configure your build setup[1]. Especially the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-parquet_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://namenode.local:8020/user/datastream/"),
    ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.
Reply | Threaded
Open this post in threaded view
|

AW: AW: StreamingFileSink with ParquetAvroWriters

Jan Oelschlegel

Sorry, wrong build. It is not in the jar.

 

Von: Jan Oelschlegel <[hidden email]>
Gesendet: Freitag, 15. Januar 2021 12:52
An: Dawid Wysakowicz <[hidden email]>; [hidden email]
Betreff: AW: AW: StreamingFileSink with ParquetAvroWriters

 

Yes, after unzipping it is in the jar:

 

 

Von: Dawid Wysakowicz <[hidden email]>
Gesendet: Freitag, 15. Januar 2021 12:10
An: Jan Oelschlegel <[hidden email]>; [hidden email]
Betreff: Re: AW: StreamingFileSink with ParquetAvroWriters

 

Have you checked if the class (org/apache/parquet/avro/AvroParquetWriter) is in the jar that you are submitting.

On 15/01/2021 12:05, Jan Oelschlegel wrote:

Hi Dawid,

 

i used the official maven archetype for a Flink project based on scala from here[1]

 

mvn archetype:generate                               \

  -DarchetypeGroupId=org.apache.flink              \

  -DarchetypeArtifactId=flink-quickstart-scala      \

  -DarchetypeVersion=1.11.2

 

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/project-configuration.html#create-project

 

Best,

Jan

 

 

Von: Dawid Wysakowicz [hidden email]
Gesendet: Donnerstag, 14. Januar 2021 12:42
An: Jan Oelschlegel [hidden email]; [hidden email]
Betreff: Re: StreamingFileSink with ParquetAvroWriters

 

Hi Jan

Could you make sure you are packaging that dependency with your job jar? There are instructions how to configure your build setup[1]. Especially the part how to build a jar with dependencies might come in handy[2].

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/project-configuration.html#appendix-template-for-building-a-jar-with-dependencies

On 13/01/2021 17:49, Jan Oelschlegel wrote:

Hi,

 

i’m using Flink (v.1.11.2) and would like to use the StreamingFileSink for writing into HDFS in Parquet format.

 

As it says in the documentation I have added the dependencies:

 

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-parquet_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>

 

And this is my file sink definition:

 

val sink: StreamingFileSink[Event] = StreamingFileSink
  .forBulkFormat(
    new Path("hdfs://namenode.local:8020/user/datastream/"),
    ParquetAvroWriters.forReflectRecord(classOf[Event])
  )
  .build()

 

 

If I execute this in cluster I get the following error:

 

java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.createAvroParquetWriter(ParquetAvroWriters.java:84)

    at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.lambda$forReflectRecord$3c375096$1(ParquetAvroWriters.java:73)

    at org.apache.flink.formats.parquet.ParquetWriterFactory.create(ParquetWriterFactory.java:57)

    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter.openNew(BulkBucketWriter.java:69)

    at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedBucketWriter.openNewInProgressFile(OutputStreamBasedPartFileWriter.java:83)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:209)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:200)

    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:282)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(StreamingFileSinkHelper.java:104)

    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:420)

    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)

    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)

    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)

    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)

    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)

    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)

    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)

    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)

 

 

Looks like there are some dependencies missing. How can I fix this?

 

 

Jan O.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.

HINWEIS: Dies ist eine vertrauliche Nachricht und nur für den Adressaten bestimmt. Es ist nicht erlaubt, diese Nachricht zu kopieren oder Dritten zugänglich zu machen. Sollten Sie diese Nachricht irrtümlich erhalten haben, bitte ich um Ihre Mitteilung per E-Mail oder unter der oben angegebenen Telefonnummer.