Problems with RollingSink

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Problems with RollingSink

Diego Fustes Villadóniga

Hi colleagues,

 

I am experiencing problems when trying to write events from a stream to HDFS. I get the following exception:

 

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /user/biguardian/events/2016-11-28--15/flinkpart-0-0.text for DFSClient_NONMAPREDUCE_1634980080_43 for client 172.21.40.75 because current leaseholder is trying to recreate file.

 

My Flink version is 1.1.3 and I am running it directly from a JAR (not in YARN) with java -jar.

 

Do you know the reason of this error?

 

Kind regards,

 

Diego

Reply | Threaded
Open this post in threaded view
|

Re: Problems with RollingSink

Kostas Kloudas
Hi Diego,

The message shows that two tasks are trying to touch concurrently the same file.

This message is thrown upon recovery after a failure, or at the initialization of the job?
Could you please check the logs for other exceptions before this?

Can this be related to this issue?

Thanks,
Kostas

On Nov 28, 2016, at 5:37 PM, Diego Fustes Villadóniga <[hidden email]> wrote:

Hi colleagues,
 
I am experiencing problems when trying to write events from a stream to HDFS. I get the following exception:
 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /user/biguardian/events/2016-11-28--15/flinkpart-0-0.text for DFSClient_NONMAPREDUCE_1634980080_43 for client 172.21.40.75 because current leaseholder is trying to recreate file.
 
My Flink version is 1.1.3 and I am running it directly from a JAR (not in YARN) with java -jar.
 
Do you know the reason of this error?
 
Kind regards,
 
Diego

Reply | Threaded
Open this post in threaded view
|

RE: Problems with RollingSink

Diego Fustes Villadóniga

Hi Kostas,

 

Thanks for your reply.

 

The problem is at the initialization of the job.  The reason was that I was using the same HDFS path as sink for 3 different streams, which is something that I would like. I can fix it by using different paths

for each stream.

 

Maybe there is a way to achieve this in a different manner by joining the streams somehow before sinking… maybe through Kafka?

 

Kind Regards,

 

Diego

 

 

 

De: Kostas Kloudas [mailto:[hidden email]]
Enviado el: lunes, 28 de noviembre de 2016 19:13
Para: [hidden email]
Asunto: Re: Problems with RollingSink

 

Hi Diego,

 

The message shows that two tasks are trying to touch concurrently the same file.

 

This message is thrown upon recovery after a failure, or at the initialization of the job?

Could you please check the logs for other exceptions before this?

 

Can this be related to this issue?

 

Thanks,

Kostas

 

On Nov 28, 2016, at 5:37 PM, Diego Fustes Villadóniga <[hidden email]> wrote:

 

Hi colleagues,

 

I am experiencing problems when trying to write events from a stream to HDFS. I get the following exception:

 

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /user/biguardian/events/2016-11-28--15/flinkpart-0-0.text for DFSClient_NONMAPREDUCE_1634980080_43 for client 172.21.40.75 because current leaseholder is trying to recreate file.

 

My Flink version is 1.1.3 and I am running it directly from a JAR (not in YARN) with java -jar.

 

Do you know the reason of this error?

 

Kind regards,

 

Diego

 

Reply | Threaded
Open this post in threaded view
|

Re: Problems with RollingSink

Kostas Kloudas
Hi Diego,

You cannot prefix each stream with a different
string so that the paths do not collide?

If I understand your use-case correctly, this might work.

Cheers,
Kostas

On Nov 29, 2016, at 10:04 AM, Diego Fustes Villadóniga <[hidden email]> wrote:

Hi Kostas,
 
Thanks for your reply.
 
The problem is at the initialization of the job.  The reason was that I was using the same HDFS path as sink for 3 different streams, which is something that I would like. I can fix it by using different paths
for each stream.
 
Maybe there is a way to achieve this in a different manner by joining the streams somehow before sinking… maybe through Kafka?
 
Kind Regards,
 
Diego
 
 
De: Kostas Kloudas [[hidden email]] 
Enviado el: lunes, 28 de noviembre de 2016 19:13
Para: [hidden email]
Asunto: Re: Problems with RollingSink
 
Hi Diego,
 
The message shows that two tasks are trying to touch concurrently the same file.
 
This message is thrown upon recovery after a failure, or at the initialization of the job?
Could you please check the logs for other exceptions before this?
 
Can this be related to this issue?
 
Thanks,
Kostas
 
On Nov 28, 2016, at 5:37 PM, Diego Fustes Villadóniga <[hidden email]> wrote:
 
Hi colleagues,
 
I am experiencing problems when trying to write events from a stream to HDFS. I get the following exception:
 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /user/biguardian/events/2016-11-28--15/flinkpart-0-0.text for DFSClient_NONMAPREDUCE_1634980080_43 for client 172.21.40.75 because current leaseholder is trying to recreate file.
 
My Flink version is 1.1.3 and I am running it directly from a JAR (not in YARN) with java -jar.
 
Do you know the reason of this error?
 
Kind regards,
 
Diego

Reply | Threaded
Open this post in threaded view
|

Re: Problems with RollingSink

Fabian Hueske-2
Hi Diego,

If you want the data of all streams to be written to the same files, you can also union the streams before sending them to the sink.

Best, Fabian

2016-11-29 15:50 GMT+01:00 Kostas Kloudas <[hidden email]>:
Hi Diego,

You cannot prefix each stream with a different
string so that the paths do not collide?

If I understand your use-case correctly, this might work.

Cheers,
Kostas

On Nov 29, 2016, at 10:04 AM, Diego Fustes Villadóniga <[hidden email]> wrote:

Hi Kostas,
 
Thanks for your reply.
 
The problem is at the initialization of the job.  The reason was that I was using the same HDFS path as sink for 3 different streams, which is something that I would like. I can fix it by using different paths
for each stream.
 
Maybe there is a way to achieve this in a different manner by joining the streams somehow before sinking… maybe through Kafka?
 
Kind Regards,
 
Diego
 
 
De: Kostas Kloudas [[hidden email]] 
Enviado el: lunes, 28 de noviembre de 2016 19:13
Para: [hidden email]
Asunto: Re: Problems with RollingSink
 
Hi Diego,
 
The message shows that two tasks are trying to touch concurrently the same file.
 
This message is thrown upon recovery after a failure, or at the initialization of the job?
Could you please check the logs for other exceptions before this?
 
Can this be related to this issue?
 
Thanks,
Kostas
 
On Nov 28, 2016, at 5:37 PM, Diego Fustes Villadóniga <[hidden email]> wrote:
 
Hi colleagues,
 
I am experiencing problems when trying to write events from a stream to HDFS. I get the following exception:
 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /user/biguardian/events/2016-11-28--15/flinkpart-0-0.text for DFSClient_NONMAPREDUCE_1634980080_43 for client 172.21.40.75 because current leaseholder is trying to recreate file.
 
My Flink version is 1.1.3 and I am running it directly from a JAR (not in YARN) with java -jar.
 
Do you know the reason of this error?
 
Kind regards,
 
Diego


Reply | Threaded
Open this post in threaded view
|

RE: Problems with RollingSink

Diego Fustes Villadóniga

Hi Fabian and Kostas:

 

Thanks! Both approaches look good, although I prefer using union, since then I can apply next steps to a single stream.

 

Here is my code now:

 

 

//For each data source
val normalizedStreams = dataSources.map(source => {

 
val sourceName = source._1
 
val dataSource = source._2

 
//Create stream from kafka topic
 
val eventStream = env.addSource(KafkaSource(conf, dataSource))

 
//Normalize event stream
 
val normalizer = dataSource.normalizer
 
val normalizedStream = eventStream.map(event => normalizer.normalizedEvent(Event(event)))

  normalizedStream

})

var unitedNormalizedStream = normalizedStreams.head
normalizedStreams.tail.foreach(stream => unitedNormalizedStream = unitedNormalizedStream.union(stream))


//Enrich normalized event stream for each registered enrichment, in two different streams
val enrichedEventStream = unitedNormalizedStream.map(event => {

 
var enrichedEvent = EnrichedEvent(event, mutable.Map())
 
for ((enrichmentName, enrichment) <- enrichments) {
    enrichedEvent = enrichment.enriched(enrichedEvent)
  }

  enrichedEvent

})

 

enrichedEventStream.addSink(new RollingSink[EnrichedEvent](conf.getString(Configuration.HDFS_LANDING_AREA, "")).setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm")))

 

 

 

Does it look good?  I don’t like the way that I have to do union by means of a var… Is there a helper class to union a stream list?

 

Kind regards,

 

Diego

 

De: Fabian Hueske [mailto:[hidden email]]
Enviado el: martes, 29 de noviembre de 2016 15:52
Para: [hidden email]
Asunto: Re: Problems with RollingSink

 

Hi Diego,

If you want the data of all streams to be written to the same files, you can also union the streams before sending them to the sink.

Best, Fabian

 

2016-11-29 15:50 GMT+01:00 Kostas Kloudas <[hidden email]>:

Hi Diego,

 

You cannot prefix each stream with a different

string so that the paths do not collide?

 

If I understand your use-case correctly, this might work.

 

Cheers,

Kostas

 

On Nov 29, 2016, at 10:04 AM, Diego Fustes Villadóniga <[hidden email]> wrote:

 

Hi Kostas,

 

Thanks for your reply.

 

The problem is at the initialization of the job.  The reason was that I was using the same HDFS path as sink for 3 different streams, which is something that I would like. I can fix it by using different paths

for each stream.

 

Maybe there is a way to achieve this in a different manner by joining the streams somehow before sinking… maybe through Kafka?

 

Kind Regards,

 

Diego

 

 

 

De: Kostas Kloudas [[hidden email]] 
Enviado el: lunes, 28 de noviembre de 2016 19:13
Para: [hidden email]
Asunto: Re: Problems with RollingSink

 

Hi Diego,

 

The message shows that two tasks are trying to touch concurrently the same file.

 

This message is thrown upon recovery after a failure, or at the initialization of the job?

Could you please check the logs for other exceptions before this?

 

Can this be related to this issue?

 

Thanks,

Kostas

 

On Nov 28, 2016, at 5:37 PM, Diego Fustes Villadóniga <[hidden email]> wrote:

 

Hi colleagues,

 

I am experiencing problems when trying to write events from a stream to HDFS. I get the following exception:

 

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): failed to create file /user/biguardian/events/2016-11-28--15/flinkpart-0-0.text for DFSClient_NONMAPREDUCE_1634980080_43 for client 172.21.40.75 because current leaseholder is trying to recreate file.

 

My Flink version is 1.1.3 and I am running it directly from a JAR (not in YARN) with java -jar.

 

Do you know the reason of this error?

 

Kind regards,

 

Diego