Flink Stream Sink ParquetWriter Failing with ClassCastException

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Stream Sink ParquetWriter Failing with ClassCastException

anuj.aj07

Hi All,

i have Written a consumer that read from kafka topic and write the data in parquet format using StreamSink . But i am getting following error. Its runs for some hours than start failing with this excpetions. I tried to restart it but failing with same exceptions.After i restart with latest offset it started working fine for soem hours and than again fail. I am not able to find root cause for this issue. 

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence
    at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


code  :

DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);


final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

sourceStream.addSink(sink).setParallelism(parallelism);
I need to undetstand why its ran for few hours than start failing. Please help me to understand this. 



--
Thanks & Regards,
Anuj Jain


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

Till Rohrmann
Hi Anuj,

it looks to me that your input GenericRecords don't conform with your output schema schemaSubject. At least, the stack trace says that your output schema expects some String field but the field was actually some ArrayList. Consequently, I would suggest to verify that your input data has the right format and if not to filter those records out which are non-conformant.

Cheers,
Till

On Sat, Feb 29, 2020 at 2:13 PM aj <[hidden email]> wrote:

Hi All,

i have Written a consumer that read from kafka topic and write the data in parquet format using StreamSink . But i am getting following error. Its runs for some hours than start failing with this excpetions. I tried to restart it but failing with same exceptions.After i restart with latest offset it started working fine for soem hours and than again fail. I am not able to find root cause for this issue. 

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence
    at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


code  :

DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);


final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

sourceStream.addSink(sink).setParallelism(parallelism);
I need to undetstand why its ran for few hours than start failing. Please help me to understand this. 



--
Thanks & Regards,
Anuj Jain


Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

anuj.aj07
Hi Till,

Thanks for the reply .
I have doubt that input has problem because :

1. if input has some problem than it should not come in the topic itself as schema validation fail at producer side only.  
2.  i am using the same schema that was used to writed the record in topic and i am able to parse the record with same schema as when i try to print the stream its not giving any error , only problem occurring when writing as parquet.

This is the code that i am using to get the schema that i m passing to parquetwriter. 

public static Schema getSchema(String subjectName) {
try {
List<Integer> versions = registryClient.getAllVersions(subjectName);
SchemaMetadata schemaMeta = registryClient.getSchemaMetadata(subjectName, versions.get(versions.size() - 1));
Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
return schema;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

How input can pass through and inserted in topic if it has some issue. Even if its occusring how to find those record and skip that so that because of one record my whole processing should not fail. 

Thanks,
Anuj





On Sat, Feb 29, 2020 at 9:12 PM Till Rohrmann <[hidden email]> wrote:
Hi Anuj,

it looks to me that your input GenericRecords don't conform with your output schema schemaSubject. At least, the stack trace says that your output schema expects some String field but the field was actually some ArrayList. Consequently, I would suggest to verify that your input data has the right format and if not to filter those records out which are non-conformant.

Cheers,
Till

On Sat, Feb 29, 2020 at 2:13 PM aj <[hidden email]> wrote:

Hi All,

i have Written a consumer that read from kafka topic and write the data in parquet format using StreamSink . But i am getting following error. Its runs for some hours than start failing with this excpetions. I tried to restart it but failing with same exceptions.After i restart with latest offset it started working fine for soem hours and than again fail. I am not able to find root cause for this issue. 

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence
    at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


code  :

DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);


final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

sourceStream.addSink(sink).setParallelism(parallelism);
I need to undetstand why its ran for few hours than start failing. Please help me to understand this. 



--
Thanks & Regards,
Anuj Jain




--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07



Reply | Threaded
Open this post in threaded view
|

Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

Till Rohrmann
Hi Anuj,

if you use the exact same schema with which the data has been written for reading and if there is no bug in the parquet Avro support, then it should indeed not fail. Hence, I suspect that the producer of your data might produce slightly different Avro records compared to what Parquet is expecting. But this is just guessing here.

The reason why you don't see the program fail when only printing the records is that you don't transform the GenericRecords into the Parquet format which expects a certain format given the schema.

Maybe it could help to figure out which Avro version your writer uses and then to compare it to Parquet's AvroWriteSupport. Additionally, the used schema could be helpful as well.

It could also be that you are running into this Parquet issue [1]. In this case, you could try to solve the problem by bumping the Parquet version.


Cheers,
Till


On Sat, Feb 29, 2020 at 5:56 PM aj <[hidden email]> wrote:
Hi Till,

Thanks for the reply .
I have doubt that input has problem because :

1. if input has some problem than it should not come in the topic itself as schema validation fail at producer side only.  
2.  i am using the same schema that was used to writed the record in topic and i am able to parse the record with same schema as when i try to print the stream its not giving any error , only problem occurring when writing as parquet.

This is the code that i am using to get the schema that i m passing to parquetwriter. 

public static Schema getSchema(String subjectName) {
try {
List<Integer> versions = registryClient.getAllVersions(subjectName);
SchemaMetadata schemaMeta = registryClient.getSchemaMetadata(subjectName, versions.get(versions.size() - 1));
Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
return schema;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

How input can pass through and inserted in topic if it has some issue. Even if its occusring how to find those record and skip that so that because of one record my whole processing should not fail. 

Thanks,
Anuj





On Sat, Feb 29, 2020 at 9:12 PM Till Rohrmann <[hidden email]> wrote:
Hi Anuj,

it looks to me that your input GenericRecords don't conform with your output schema schemaSubject. At least, the stack trace says that your output schema expects some String field but the field was actually some ArrayList. Consequently, I would suggest to verify that your input data has the right format and if not to filter those records out which are non-conformant.

Cheers,
Till

On Sat, Feb 29, 2020 at 2:13 PM aj <[hidden email]> wrote:

Hi All,

i have Written a consumer that read from kafka topic and write the data in parquet format using StreamSink . But i am getting following error. Its runs for some hours than start failing with this excpetions. I tried to restart it but failing with same exceptions.After i restart with latest offset it started working fine for soem hours and than again fail. I am not able to find root cause for this issue. 

java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.CharSequence
    at org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
    at org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
    at org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
    at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
    at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
    at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
    at org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


code  :

DataStream<GenericRecord> sourceStream = env.addSource(kafkaConsumer010);


final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
                    (path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
                    .withBucketAssigner(new EventTimeBucketAssigner())
                    .build();

sourceStream.addSink(sink).setParallelism(parallelism);
I need to undetstand why its ran for few hours than start failing. Please help me to understand this. 



--
Thanks & Regards,
Anuj Jain




--
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07