Flink to ingest from Kafka to HDFS?

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink to ingest from Kafka to HDFS?

Hans-Peter Zorn
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter


Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Stephan Ewen
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter



Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Stephan Ewen
If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter




Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Rico Bergmann
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter




Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Aljoscha Krettek
Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter




Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Rico Bergmann
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?



Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter




Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Stephan Ewen
Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?



Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter





Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Stephan Ewen
BTW: This is becoming a dev discussion, maybe should move to that list...

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:
Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?



Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter






Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Stephan Ewen
Hi Rico!

Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in
and implement this, unless you have something already.

Stephan


On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <[hidden email]> wrote:
BTW: This is becoming a dev discussion, maybe should move to that list...

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:
Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?



Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter







Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Rico Bergmann
Hi!

Sorry, I won't be able to implement this soon. I just shared my ideas on this. 

Greets. Rico. 



Am 25.08.2015 um 17:52 schrieb Stephan Ewen <[hidden email]>:

Hi Rico!

Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in
and implement this, unless you have something already.

Stephan


On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <[hidden email]> wrote:
BTW: This is becoming a dev discussion, maybe should move to that list...

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:
Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:
My ideas for checkpointing:

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:
Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

Does this sound reasonable?



Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

Do you have any Ideas about the rolling files/checkpointing?

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:
I'm thinking about implementing this. 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

U think this is a good approach?

Greets. Rico. 



Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This should definitely be possible in Flink. Pretty much exactly like you describe it.

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

Greetings,
Stephan



On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:
Hi,

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with
Flink alone?

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?
So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

Best, 
Hans-Peter







Reply | Threaded
Open this post in threaded view
|

RE: Flink to ingest from Kafka to HDFS?

LINZ, Arnaud

Hi Stephen,

 

I do not have a Kafka->HDFS solution, but I do have a streaming sink that writes to HDFS (external, text hive table) with auto-partitioning and rolling files. However, it does not take care of checkpointing and may have flushing issues if some partitions are seldom seen.

 

I’m not sure it will save you much time, especially given the fact that it has not been really used yet.

 

Code is provided with no copyright and no warranty!

 

import java.io.BufferedOutputStream;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.commons.io.IOUtils;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.hive.hcatalog.data.DefaultHCatRecord;

import org.apache.hive.hcatalog.data.schema.HCatSchema;

import org.joda.time.DateTime;

 

/**

* This sink streams data to a HDFS directory (hive external table) with a size limit (rolling files) and automatic

* partitioning. To be able to read the file content while it’s still being written, an idea is to add a char(1) field in the last

* position of the hive line and to check if it has the proper value when read (if not, the line is not complete)

*

 * @author alinz

*/

public class HiveStreamOutput extends RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {

 

    /**

     * The Class StreamingFile, encapsulates an open output hdfs file

     */

    public static class StreamingFile {

 

        /** base directory*/

        private final String rootPath;

        /** prefix*/

        private final String prefix;

 

        /** file path*/

        private Path path;

 

        /** open output stream */

        private BufferedOutputStream stream;

 

        /** current size */

        private long size = 0;

 

        /** current file number*/

        private long nbFile = 0;

 

        /** instant of the last writing on this stream. If the interval is too long, flushes content*/

        private long lastInvoke;

 

        /**

         * Instantiates a new streaming file.

         * @param rootPath destination path

         * @param prefix file name prefix

         * @throws IOException cannot open file

         */

        public StreamingFile(String rootPath, String prefix) throws IOException {

            super();

            this.rootPath = rootPath;

            this.prefix = prefix;

            lastInvoke = 0; // always flushes first record

            open();

        }

 

        /**

         * Create destination file on FS

         * @throws IOException issue when opening file

         */

        private void open() throws IOException {

            this.path = new Path(rootPath, prefix + nbFile);

            final FileSystem filesys = path.getFileSystem();

            filesys.mkdirs(path.getParent());

            stream = new BufferedOutputStream(filesys.create(path, true));

        }

 

        /**

         * closes stream

         */

        public void closeStream() {

            IOUtils.closeQuietly(stream);

            stream = null; // NOPMD

        }

 

        /**

         * Write data into the stream

         * @param data data to write

         * @param maxSize max size of data ; split the file if we reach it

         * @throws IOException writing issue

         */

        public void writeStream(byte[] data, long maxSize) throws IOException {

            stream.write(data);

            // If the source is too slow, flushes the data. Using this method, We do not always have the "last flushes",

            // especially concerning old partitions.           

            // TODO If it's an issue, implements a time out thread.

            final long maxDelayFlush = 100;

            final long invokeTime = System.currentTimeMillis();

            if (invokeTime - lastInvoke > maxDelayFlush) {

                stream.flush();

            }

            lastInvoke = invokeTime;

            if (incTaille(data.length) >= maxSize) {

                split();

            }

        }

 

        /**

         * increment file size

         * @param amount what to add

         * @return the new size

         */

        private long incTaille(long amount) {

            size += amount;

            return size;

        }

 

        /**

         * Closes current file and open a new one

         * @throws IOException issue when opening file

         */

        private void split() throws IOException {

            closeStream();

            nbFile++;

            open();

            size = 0;

        }

 

        /**

         * flushes stream

         * @throws IOException I/O issue

         */

        public void flushStream() throws IOException {

            stream.flush();

        }

    }

 

    /** SUID. */

    private static final long serialVersionUID = 1L;

 

    // Shared fields

 

    /** Output hive table scheme */

    private final HCatSchema outputSchema;

 

    /** field delimiter */

    private final char delim;

 

    /** hdfs root path */

    private final String hdfsPath;

 

    /** Max file size */

    private final long maxSize;

 

    // Subtask fields

   

    /** filename prefix for a subtask, prevents conflicts with another subtask or a previous run */

    private transient String namePrefix;

 

    /** map of streams indexed per met partition */

    private transient Map<String, StreamingFile> streams;

 

    /** instant of the last periodic flush */

    private transient long lastFlushAll;

 

    /**

     * Builds a streamer.

     * @param outputSchema output record schema (without partition)

     * @param delim field delimiter

     * @param hdfsPath HDFS destination path

     * @param maxSize max size of a file (rolls the file if reached)

     */

    public HiveStreamOutput(HCatSchema outputSchema, char delim, String hdfsPath, long maxSize) {

        super();

        this.outputSchema = outputSchema;

        this.delim = delim;

        this.hdfsPath = hdfsPath;

        this.maxSize = maxSize;

    }

 

    /** {@inheritDoc} */

    @Override

    public void open(Configuration parameters) throws Exception { // NOPMD

        // Prefix is unique for a run and a subtask, to avoid conflicts

        namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() + "_" + (new DateTime().getMillis()) + "_";

        streams = new HashMap<String, StreamingFile>();

    }

 

    /** {@inheritDoc} */

    @Override

    public void close() throws Exception { // NOPMD

        for (final StreamingFile file : streams.values()) {

            file.closeStream();

        }

    }

 

    /** {@inheritDoc} */

    @Override

    public void invoke(Tuple2<String, DefaultHCatRecord> value) throws Exception { // NOPMD

        final String partition = value.f0;

        final String record = HiveFileOutputFormat.getRecordLine(value.f1, outputSchema, delim);

        // Do we have an open data stream for this partition ?

        StreamingFile file = streams.get(partition);

        if (file == null) {

            file = new StreamingFile(hdfsPath + "/" + partition, namePrefix);

            streams.put(partition, file);

        }

        file.writeStream(record.getBytes(), maxSize);

 

        // Periodically flush all streams

        final long invoke = System.currentTimeMillis();

        final long flushPeriod = 10000;

        if (invoke - lastFlushAll > flushPeriod) {

            lastFlushAll = invoke;

            for (final StreamingFile stream : streams.values()) {

                stream.flushStream();

            }

        }

    }

}

 

And the missing HiveFileOutputFormat.getRecordLine :

 

/**TODO  partitions should not be part of the line. But since they are in the last position, it's a minor issue

     * Shared method to transform a hive record into a text line<br>

     * TODO  use of deprecated types is more convenient, but I should get rid of it.<br>

     * @param record hive record

     * @param schema line scheme

     * @param separator field delimitor

     * @return corresponding line, ended with \n

     */

    @SuppressWarnings("deprecation")

    // because it's so convenient

    public static String getRecordLine(DefaultHCatRecord record, HCatSchema schema, char separator) {

        final int fldNumbr = Math.min(schema.size(), record.size());

        final StringBuffer line = new StringBuffer();

 

        for (int idx = 0; idx < fldNumbr; idx++) {

            final Object fieldVal = record.get(idx);

            final String strFieldVal;

            if (fieldVal == null) {

                strFieldVal = "";

            }

            else {

                switch (schema.get(idx).getType()) {

                    case DOUBLE:

                    case FLOAT:

                    case DECIMAL:

                    case BIGINT:

                    case INT:

                    case SMALLINT:

                    case TINYINT:

                    case CHAR:

                    case STRING:

                    case VARCHAR:

                    case BOOLEAN:

                    case DATE:

                    case TIMESTAMP:

                        strFieldVal = fieldVal.toString();

                        break;

                    case ARRAY:

                    case MAP:

                    case STRUCT:

                    case BINARY:

                    default:

                        throw new IllegalArgumentException("Complex Hive types (" + schema.get(idx).getTypeString()

                            + ") are not supported");

                }

            }

            line.append(strFieldVal);

            if (idx < fldNumbr - 1) {

                line.append(separator);

            }

        }

        line.append('\n');

        return line.toString();

    }

 

 

 

 

De : Rico Bergmann [mailto:[hidden email]]
Envoyé : mercredi 26 août 2015 07:49
À : [hidden email]
Objet : Re: Flink to ingest from Kafka to HDFS?

 

Hi!

 

Sorry, I won't be able to implement this soon. I just shared my ideas on this. 

 

Greets. Rico. 


Am 25.08.2015 um 17:52 schrieb Stephan Ewen <[hidden email]>:

Hi Rico!

 

Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in

and implement this, unless you have something already.

 

Stephan

 

 

On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <[hidden email]> wrote:

BTW: This is becoming a dev discussion, maybe should move to that list...

 

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:

Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

 

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

 

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

 

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:

My ideas for checkpointing:

 

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:

Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

 

Does this sound reasonable?


Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

 

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

 

Do you have any Ideas about the rolling files/checkpointing?

 

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:

I'm thinking about implementing this. 

 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

 

U think this is a good approach?

 

Greets. Rico. 


Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

 

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

 

This should definitely be possible in Flink. Pretty much exactly like you describe it.

 

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

 

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

 

Greetings,
Stephan

 

 

 

On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:

Hi,

 

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with

Flink alone?

 

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?

So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

 

Best, 

Hans-Peter

 

 

 

 

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Hans-Peter Zorn
Hi Stephan,

even though I started the discussion, I was just trying to estimate the effort. In that project they finally opted to use flume with a Kafka channel. 

Best, Hans-Peter

On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi Stephen,

 

I do not have a Kafka->HDFS solution, but I do have a streaming sink that writes to HDFS (external, text hive table) with auto-partitioning and rolling files. However, it does not take care of checkpointing and may have flushing issues if some partitions are seldom seen.

 

I’m not sure it will save you much time, especially given the fact that it has not been really used yet.

 

Code is provided with no copyright and no warranty!

 

import java.io.BufferedOutputStream;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.commons.io.IOUtils;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.hive.hcatalog.data.DefaultHCatRecord;

import org.apache.hive.hcatalog.data.schema.HCatSchema;

import org.joda.time.DateTime;

 

/**

* This sink streams data to a HDFS directory (hive external table) with a size limit (rolling files) and automatic

* partitioning. To be able to read the file content while it’s still being written, an idea is to add a char(1) field in the last

* position of the hive line and to check if it has the proper value when read (if not, the line is not complete)

*

 * @author alinz

*/

public class HiveStreamOutput extends RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {

 

    /**

     * The Class StreamingFile, encapsulates an open output hdfs file

     */

    public static class StreamingFile {

 

        /** base directory*/

        private final String rootPath;

        /** prefix*/

        private final String prefix;

 

        /** file path*/

        private Path path;

 

        /** open output stream */

        private BufferedOutputStream stream;

 

        /** current size */

        private long size = 0;

 

        /** current file number*/

        private long nbFile = 0;

 

        /** instant of the last writing on this stream. If the interval is too long, flushes content*/

        private long lastInvoke;

 

        /**

         * Instantiates a new streaming file.

         * @param rootPath destination path

         * @param prefix file name prefix

         * @throws IOException cannot open file

         */

        public StreamingFile(String rootPath, String prefix) throws IOException {

            super();

            this.rootPath = rootPath;

            this.prefix = prefix;

            lastInvoke = 0; // always flushes first record

            open();

        }

 

        /**

         * Create destination file on FS

         * @throws IOException issue when opening file

         */

        private void open() throws IOException {

            this.path = new Path(rootPath, prefix + nbFile);

            final FileSystem filesys = path.getFileSystem();

            filesys.mkdirs(path.getParent());

            stream = new BufferedOutputStream(filesys.create(path, true));

        }

 

        /**

         * closes stream

         */

        public void closeStream() {

            IOUtils.closeQuietly(stream);

            stream = null; // NOPMD

        }

 

        /**

         * Write data into the stream

         * @param data data to write

         * @param maxSize max size of data ; split the file if we reach it

         * @throws IOException writing issue

         */

        public void writeStream(byte[] data, long maxSize) throws IOException {

            stream.write(data);

            // If the source is too slow, flushes the data. Using this method, We do not always have the "last flushes",

            // especially concerning old partitions.           

            // TODO If it's an issue, implements a time out thread.

            final long maxDelayFlush = 100;

            final long invokeTime = System.currentTimeMillis();

            if (invokeTime - lastInvoke > maxDelayFlush) {

                stream.flush();

            }

            lastInvoke = invokeTime;

            if (incTaille(data.length) >= maxSize) {

                split();

            }

        }

 

        /**

         * increment file size

         * @param amount what to add

         * @return the new size

         */

        private long incTaille(long amount) {

            size += amount;

            return size;

        }

 

        /**

         * Closes current file and open a new one

         * @throws IOException issue when opening file

         */

        private void split() throws IOException {

            closeStream();

            nbFile++;

            open();

            size = 0;

        }

 

        /**

         * flushes stream

         * @throws IOException I/O issue

         */

        public void flushStream() throws IOException {

            stream.flush();

        }

    }

 

    /** SUID. */

    private static final long serialVersionUID = 1L;

 

    // Shared fields

 

    /** Output hive table scheme */

    private final HCatSchema outputSchema;

 

    /** field delimiter */

    private final char delim;

 

    /** hdfs root path */

    private final String hdfsPath;

 

    /** Max file size */

    private final long maxSize;

 

    // Subtask fields

   

    /** filename prefix for a subtask, prevents conflicts with another subtask or a previous run */

    private transient String namePrefix;

 

    /** map of streams indexed per met partition */

    private transient Map<String, StreamingFile> streams;

 

    /** instant of the last periodic flush */

    private transient long lastFlushAll;

 

    /**

     * Builds a streamer.

     * @param outputSchema output record schema (without partition)

     * @param delim field delimiter

     * @param hdfsPath HDFS destination path

     * @param maxSize max size of a file (rolls the file if reached)

     */

    public HiveStreamOutput(HCatSchema outputSchema, char delim, String hdfsPath, long maxSize) {

        super();

        this.outputSchema = outputSchema;

        this.delim = delim;

        this.hdfsPath = hdfsPath;

        this.maxSize = maxSize;

    }

 

    /** {@inheritDoc} */

    @Override

    public void open(Configuration parameters) throws Exception { // NOPMD

        // Prefix is unique for a run and a subtask, to avoid conflicts

        namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() + "_" + (new DateTime().getMillis()) + "_";

        streams = new HashMap<String, StreamingFile>();

    }

 

    /** {@inheritDoc} */

    @Override

    public void close() throws Exception { // NOPMD

        for (final StreamingFile file : streams.values()) {

            file.closeStream();

        }

    }

 

    /** {@inheritDoc} */

    @Override

    public void invoke(Tuple2<String, DefaultHCatRecord> value) throws Exception { // NOPMD

        final String partition = value.f0;

        final String record = HiveFileOutputFormat.getRecordLine(value.f1, outputSchema, delim);

        // Do we have an open data stream for this partition ?

        StreamingFile file = streams.get(partition);

        if (file == null) {

            file = new StreamingFile(hdfsPath + "/" + partition, namePrefix);

            streams.put(partition, file);

        }

        file.writeStream(record.getBytes(), maxSize);

 

        // Periodically flush all streams

        final long invoke = System.currentTimeMillis();

        final long flushPeriod = 10000;

        if (invoke - lastFlushAll > flushPeriod) {

            lastFlushAll = invoke;

            for (final StreamingFile stream : streams.values()) {

                stream.flushStream();

            }

        }

    }

}

 

And the missing HiveFileOutputFormat.getRecordLine :

 

/**TODO  partitions should not be part of the line. But since they are in the last position, it's a minor issue

     * Shared method to transform a hive record into a text line<br>

     * TODO  use of deprecated types is more convenient, but I should get rid of it.<br>

     * @param record hive record

     * @param schema line scheme

     * @param separator field delimitor

     * @return corresponding line, ended with \n

     */

    @SuppressWarnings("deprecation")

    // because it's so convenient

    public static String getRecordLine(DefaultHCatRecord record, HCatSchema schema, char separator) {

        final int fldNumbr = Math.min(schema.size(), record.size());

        final StringBuffer line = new StringBuffer();

 

        for (int idx = 0; idx < fldNumbr; idx++) {

            final Object fieldVal = record.get(idx);

            final String strFieldVal;

            if (fieldVal == null) {

                strFieldVal = "";

            }

            else {

                switch (schema.get(idx).getType()) {

                    case DOUBLE:

                    case FLOAT:

                    case DECIMAL:

                    case BIGINT:

                    case INT:

                    case SMALLINT:

                    case TINYINT:

                    case CHAR:

                    case STRING:

                    case VARCHAR:

                    case BOOLEAN:

                    case DATE:

                    case TIMESTAMP:

                        strFieldVal = fieldVal.toString();

                        break;

                    case ARRAY:

                    case MAP:

                    case STRUCT:

                    case BINARY:

                    default:

                        throw new IllegalArgumentException("Complex Hive types (" + schema.get(idx).getTypeString()

                            + ") are not supported");

                }

            }

            line.append(strFieldVal);

            if (idx < fldNumbr - 1) {

                line.append(separator);

            }

        }

        line.append('\n');

        return line.toString();

    }

 

 

 

 

De : Rico Bergmann [mailto:[hidden email]]
Envoyé : mercredi 26 août 2015 07:49
À : [hidden email]
Objet : Re: Flink to ingest from Kafka to HDFS?

 

Hi!

 

Sorry, I won't be able to implement this soon. I just shared my ideas on this. 

 

Greets. Rico. 


Am 25.08.2015 um 17:52 schrieb Stephan Ewen <[hidden email]>:

Hi Rico!

 

Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in

and implement this, unless you have something already.

 

Stephan

 

 

On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <[hidden email]> wrote:

BTW: This is becoming a dev discussion, maybe should move to that list...

 

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:

Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

 

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

 

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

 

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:

My ideas for checkpointing:

 

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:

Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

 

Does this sound reasonable?


Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

 

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

 

Do you have any Ideas about the rolling files/checkpointing?

 

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:

I'm thinking about implementing this. 

 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

 

U think this is a good approach?

 

Greets. Rico. 


Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

 

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

 

This should definitely be possible in Flink. Pretty much exactly like you describe it.

 

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

 

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

 

Greetings,
Stephan

 

 

 

On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:

Hi,

 

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with

Flink alone?

 

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?

So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

 

Best, 

Hans-Peter

 

 

 

 

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Stephan Ewen
BTW: We are currently working on adding a rolling-file HDFS sink to Flink that will initially work very similar as what flume gives you. If I understand it correctly, Flume may have duplicates in the output from incomplete flushes on failures.

We have actually a design to extend this later to a proper "exactly once" sink, integrated into Flink's checkpointing, which discards duplicates properly by offset tracking and truncating/compacting.


On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <[hidden email]> wrote:
Hi Stephan,

even though I started the discussion, I was just trying to estimate the effort. In that project they finally opted to use flume with a Kafka channel. 

Best, Hans-Peter

On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi Stephen,

 

I do not have a Kafka->HDFS solution, but I do have a streaming sink that writes to HDFS (external, text hive table) with auto-partitioning and rolling files. However, it does not take care of checkpointing and may have flushing issues if some partitions are seldom seen.

 

I’m not sure it will save you much time, especially given the fact that it has not been really used yet.

 

Code is provided with no copyright and no warranty!

 

import java.io.BufferedOutputStream;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.commons.io.IOUtils;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.hive.hcatalog.data.DefaultHCatRecord;

import org.apache.hive.hcatalog.data.schema.HCatSchema;

import org.joda.time.DateTime;

 

/**

* This sink streams data to a HDFS directory (hive external table) with a size limit (rolling files) and automatic

* partitioning. To be able to read the file content while it’s still being written, an idea is to add a char(1) field in the last

* position of the hive line and to check if it has the proper value when read (if not, the line is not complete)

*

 * @author alinz

*/

public class HiveStreamOutput extends RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {

 

    /**

     * The Class StreamingFile, encapsulates an open output hdfs file

     */

    public static class StreamingFile {

 

        /** base directory*/

        private final String rootPath;

        /** prefix*/

        private final String prefix;

 

        /** file path*/

        private Path path;

 

        /** open output stream */

        private BufferedOutputStream stream;

 

        /** current size */

        private long size = 0;

 

        /** current file number*/

        private long nbFile = 0;

 

        /** instant of the last writing on this stream. If the interval is too long, flushes content*/

        private long lastInvoke;

 

        /**

         * Instantiates a new streaming file.

         * @param rootPath destination path

         * @param prefix file name prefix

         * @throws IOException cannot open file

         */

        public StreamingFile(String rootPath, String prefix) throws IOException {

            super();

            this.rootPath = rootPath;

            this.prefix = prefix;

            lastInvoke = 0; // always flushes first record

            open();

        }

 

        /**

         * Create destination file on FS

         * @throws IOException issue when opening file

         */

        private void open() throws IOException {

            this.path = new Path(rootPath, prefix + nbFile);

            final FileSystem filesys = path.getFileSystem();

            filesys.mkdirs(path.getParent());

            stream = new BufferedOutputStream(filesys.create(path, true));

        }

 

        /**

         * closes stream

         */

        public void closeStream() {

            IOUtils.closeQuietly(stream);

            stream = null; // NOPMD

        }

 

        /**

         * Write data into the stream

         * @param data data to write

         * @param maxSize max size of data ; split the file if we reach it

         * @throws IOException writing issue

         */

        public void writeStream(byte[] data, long maxSize) throws IOException {

            stream.write(data);

            // If the source is too slow, flushes the data. Using this method, We do not always have the "last flushes",

            // especially concerning old partitions.           

            // TODO If it's an issue, implements a time out thread.

            final long maxDelayFlush = 100;

            final long invokeTime = System.currentTimeMillis();

            if (invokeTime - lastInvoke > maxDelayFlush) {

                stream.flush();

            }

            lastInvoke = invokeTime;

            if (incTaille(data.length) >= maxSize) {

                split();

            }

        }

 

        /**

         * increment file size

         * @param amount what to add

         * @return the new size

         */

        private long incTaille(long amount) {

            size += amount;

            return size;

        }

 

        /**

         * Closes current file and open a new one

         * @throws IOException issue when opening file

         */

        private void split() throws IOException {

            closeStream();

            nbFile++;

            open();

            size = 0;

        }

 

        /**

         * flushes stream

         * @throws IOException I/O issue

         */

        public void flushStream() throws IOException {

            stream.flush();

        }

    }

 

    /** SUID. */

    private static final long serialVersionUID = 1L;

 

    // Shared fields

 

    /** Output hive table scheme */

    private final HCatSchema outputSchema;

 

    /** field delimiter */

    private final char delim;

 

    /** hdfs root path */

    private final String hdfsPath;

 

    /** Max file size */

    private final long maxSize;

 

    // Subtask fields

   

    /** filename prefix for a subtask, prevents conflicts with another subtask or a previous run */

    private transient String namePrefix;

 

    /** map of streams indexed per met partition */

    private transient Map<String, StreamingFile> streams;

 

    /** instant of the last periodic flush */

    private transient long lastFlushAll;

 

    /**

     * Builds a streamer.

     * @param outputSchema output record schema (without partition)

     * @param delim field delimiter

     * @param hdfsPath HDFS destination path

     * @param maxSize max size of a file (rolls the file if reached)

     */

    public HiveStreamOutput(HCatSchema outputSchema, char delim, String hdfsPath, long maxSize) {

        super();

        this.outputSchema = outputSchema;

        this.delim = delim;

        this.hdfsPath = hdfsPath;

        this.maxSize = maxSize;

    }

 

    /** {@inheritDoc} */

    @Override

    public void open(Configuration parameters) throws Exception { // NOPMD

        // Prefix is unique for a run and a subtask, to avoid conflicts

        namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() + "_" + (new DateTime().getMillis()) + "_";

        streams = new HashMap<String, StreamingFile>();

    }

 

    /** {@inheritDoc} */

    @Override

    public void close() throws Exception { // NOPMD

        for (final StreamingFile file : streams.values()) {

            file.closeStream();

        }

    }

 

    /** {@inheritDoc} */

    @Override

    public void invoke(Tuple2<String, DefaultHCatRecord> value) throws Exception { // NOPMD

        final String partition = value.f0;

        final String record = HiveFileOutputFormat.getRecordLine(value.f1, outputSchema, delim);

        // Do we have an open data stream for this partition ?

        StreamingFile file = streams.get(partition);

        if (file == null) {

            file = new StreamingFile(hdfsPath + "/" + partition, namePrefix);

            streams.put(partition, file);

        }

        file.writeStream(record.getBytes(), maxSize);

 

        // Periodically flush all streams

        final long invoke = System.currentTimeMillis();

        final long flushPeriod = 10000;

        if (invoke - lastFlushAll > flushPeriod) {

            lastFlushAll = invoke;

            for (final StreamingFile stream : streams.values()) {

                stream.flushStream();

            }

        }

    }

}

 

And the missing HiveFileOutputFormat.getRecordLine :

 

/**TODO  partitions should not be part of the line. But since they are in the last position, it's a minor issue

     * Shared method to transform a hive record into a text line<br>

     * TODO  use of deprecated types is more convenient, but I should get rid of it.<br>

     * @param record hive record

     * @param schema line scheme

     * @param separator field delimitor

     * @return corresponding line, ended with \n

     */

    @SuppressWarnings("deprecation")

    // because it's so convenient

    public static String getRecordLine(DefaultHCatRecord record, HCatSchema schema, char separator) {

        final int fldNumbr = Math.min(schema.size(), record.size());

        final StringBuffer line = new StringBuffer();

 

        for (int idx = 0; idx < fldNumbr; idx++) {

            final Object fieldVal = record.get(idx);

            final String strFieldVal;

            if (fieldVal == null) {

                strFieldVal = "";

            }

            else {

                switch (schema.get(idx).getType()) {

                    case DOUBLE:

                    case FLOAT:

                    case DECIMAL:

                    case BIGINT:

                    case INT:

                    case SMALLINT:

                    case TINYINT:

                    case CHAR:

                    case STRING:

                    case VARCHAR:

                    case BOOLEAN:

                    case DATE:

                    case TIMESTAMP:

                        strFieldVal = fieldVal.toString();

                        break;

                    case ARRAY:

                    case MAP:

                    case STRUCT:

                    case BINARY:

                    default:

                        throw new IllegalArgumentException("Complex Hive types (" + schema.get(idx).getTypeString()

                            + ") are not supported");

                }

            }

            line.append(strFieldVal);

            if (idx < fldNumbr - 1) {

                line.append(separator);

            }

        }

        line.append('\n');

        return line.toString();

    }

 

 

 

 

De : Rico Bergmann [mailto:[hidden email]]
Envoyé : mercredi 26 août 2015 07:49
À : [hidden email]
Objet : Re: Flink to ingest from Kafka to HDFS?

 

Hi!

 

Sorry, I won't be able to implement this soon. I just shared my ideas on this. 

 

Greets. Rico. 


Am 25.08.2015 um 17:52 schrieb Stephan Ewen <[hidden email]>:

Hi Rico!

 

Can you give us an update on your status here? We actually need something like this as well (and pretty urgent), so we would jump in

and implement this, unless you have something already.

 

Stephan

 

 

On Thu, Aug 20, 2015 at 12:13 PM, Stephan Ewen <[hidden email]> wrote:

BTW: This is becoming a dev discussion, maybe should move to that list...

 

On Thu, Aug 20, 2015 at 12:12 PM, Stephan Ewen <[hidden email]> wrote:

Yes, one needs exactly a mechanism to seek the output stream back to the last checkpointed position, in order to overwrite duplicates.

 

I think HDFS is not going to make this easy, there is basically no seek for write. Not sure how to solve this, other then writing to tmp files and copying upon success.

 

Apache Flume must have solved this issue in some way, it may be a worth looking into how they solved it.

 

On Thu, Aug 20, 2015 at 11:58 AM, Rico Bergmann <[hidden email]> wrote:

My ideas for checkpointing:

 

I think writing to the destination should not depend on the checkpoint mechanism (otherwise the output would never be written to the destination if checkpointing is disabled). Instead I would keep the offsets of written and Checkpointed records. When recovering you would then somehow delete or overwrite the records after that offset. (But I don't really know whether this is as simple as I wrote it ;-) ). 

 

Regarding the rolling files I would suggest making the values of the user-defined partitioning function part of the path or file name. Writing records is then basically:

Extract the partition to write to, then add the record to a queue for this partition. Each queue has an output format assigned to it. On flushing the output file is opened, the content of the queue is written to it, and then closed.

 

Does this sound reasonable?


Am 20.08.2015 um 10:40 schrieb Aljoscha Krettek <[hidden email]>:

Yes, this seems like a good approach. We should probably no reuse the KeySelector for this but maybe a more use-case specific type of function that can create a desired filename from an input object.

 

This is only the first part, though. The hard bit would be implementing rolling files and also integrating it with Flink's checkpointing mechanism. For integration with checkpointing you could maybe use "staging-files": all elements are put into a staging file. And then, when the notification about a completed checkpoint is received the contents of this file would me moved (or appended) to the actual destination.

 

Do you have any Ideas about the rolling files/checkpointing?

 

On Thu, 20 Aug 2015 at 09:44 Rico Bergmann <[hidden email]> wrote:

I'm thinking about implementing this. 

 

After looking into the flink code I would basically subclass FileOutputFormat in let's say KeyedFileOutputFormat, that gets an additional KeySelector object. The path in the file system is then appended by the string, the KeySelector returns. 

 

U think this is a good approach?

 

Greets. Rico. 


Am 16.08.2015 um 19:56 schrieb Stephan Ewen <[hidden email]>:

If you are up for it, this would be a very nice addition to Flink, a great contribution :-)

 

On Sun, Aug 16, 2015 at 7:56 PM, Stephan Ewen <[hidden email]> wrote:

Hi!

 

This should definitely be possible in Flink. Pretty much exactly like you describe it.

 

You need a custom version of the HDFS sink with some logic when to roll over to a new file.

 

You can also make the sink "exactly once" by integrating it with the checkpointing. For that, you would probably need to keep the current path and output stream offsets as of the last checkpoint, so you can resume from that offset and overwrite records to avoid duplicates. If that is not possible, you would probably buffer records between checkpoints and only write on checkpoints.

 

Greetings,
Stephan

 

 

 

On Sun, Aug 16, 2015 at 7:09 PM, Hans-Peter Zorn <[hidden email]> wrote:

Hi,

 

Did anybody think of (mis-) using Flink streaming as an alternative to Apache Flume just for ingesting data from Kafka (or other streaming sources) to HDFS? Knowing that Flink can read from Kafka and write to hdfs I assume it should be possible, but Is this a good idea to do? 

 

Flume basically is about consuming data from somewhere, peeking into each record and then directing it to a specific directory/file in HDFS reliably. I've seen there is a FlumeSink, but would it be possible to get the same functionality with

Flink alone?

 

I've skimmed through the documentation and found the option to split the output by key and the possibility to add multiple sinks. As I understand, Flink programs are generally static, so it would not be possible to add/remove sinks at runtime?

So you would need to implement a custom sink directing the records to different files based on a key (e.g. date)? Would it be difficult to implement things like rolling outputs etc? Or better just use Flume?

 

Best, 

Hans-Peter

 

 

 

 

 

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.


Reply | Threaded
Open this post in threaded view
|

Re: Flink to ingest from Kafka to HDFS?

Aljoscha Krettek
Hi,
I have an open Pull Request for a RollingFile sink. It is integrated with checkpointing, so it can provide exactly-once behavior. If you're interested, please check it out: https://github.com/apache/flink/pull/1084

Cheers,
Aljoscha

On Wed, 26 Aug 2015 at 10:31 Stephan Ewen <[hidden email]> wrote:
BTW: We are currently working on adding a rolling-file HDFS sink to Flink that will initially work very similar as what flume gives you. If I understand it correctly, Flume may have duplicates in the output from incomplete flushes on failures.

We have actually a design to extend this later to a proper "exactly once" sink, integrated into Flink's checkpointing, which discards duplicates properly by offset tracking and truncating/compacting.


On Wed, Aug 26, 2015 at 10:04 AM, Hans-Peter Zorn <[hidden email]> wrote:
Hi Stephan,

even though I started the discussion, I was just trying to estimate the effort. In that project they finally opted to use flume with a Kafka channel. 

Best, Hans-Peter

On Wed, Aug 26, 2015 at 9:52 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi Stephen,

 

I do not have a Kafka->HDFS solution, but I do have a streaming sink that writes to HDFS (external, text hive table) with auto-partitioning and rolling files. However, it does not take care of checkpointing and may have flushing issues if some partitions are seldom seen.

 

I’m not sure it will save you much time, especially given the fact that it has not been really used yet.

 

Code is provided with no copyright and no warranty!

 

import java.io.BufferedOutputStream;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

 

import org.apache.commons.io.IOUtils;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.configuration.Configuration;

import org.apache.flink.core.fs.FileSystem;

import org.apache.flink.core.fs.Path;

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import org.apache.hive.hcatalog.data.DefaultHCatRecord;

import org.apache.hive.hcatalog.data.schema.HCatSchema;

import org.joda.time.DateTime;

 

/**

* This sink streams data to a HDFS directory (hive external table) with a size limit (rolling files) and automatic

* partitioning. To be able to read the file content while it’s still being written, an idea is to add a char(1) field in the last

* position of the hive line and to check if it has the proper value when read (if not, the line is not complete)

*

 * @author alinz

*/

public class HiveStreamOutput extends RichSinkFunction<Tuple2<String, DefaultHCatRecord>> {

 

    /**

     * The Class StreamingFile, encapsulates an open output hdfs file

     */

    public static class StreamingFile {

 

        /** base directory*/

        private final String rootPath;

        /** prefix*/

        private final String prefix;

 

        /** file path*/

        private Path path;

 

        /** open output stream */

        private BufferedOutputStream stream;

 

        /** current size */

        private long size = 0;

 

        /** current file number*/

        private long nbFile = 0;

 

        /** instant of the last writing on this stream. If the interval is too long, flushes content*/

        private long lastInvoke;

 

        /**

         * Instantiates a new streaming file.

         * @param rootPath destination path

         * @param prefix file name prefix

         * @throws IOException cannot open file

         */

        public StreamingFile(String rootPath, String prefix) throws IOException {

            super();

            this.rootPath = rootPath;

            this.prefix = prefix;

            lastInvoke = 0; // always flushes first record

            open();

        }

 

        /**

         * Create destination file on FS

         * @throws IOException issue when opening file

         */

        private void open() throws IOException {

            this.path = new Path(rootPath, prefix + nbFile);

            final FileSystem filesys = path.getFileSystem();

            filesys.mkdirs(path.getParent());

            stream = new BufferedOutputStream(filesys.create(path, true));

        }

 

        /**

         * closes stream

         */

        public void closeStream() {

            IOUtils.closeQuietly(stream);

            stream = null; // NOPMD

        }

 

        /**

         * Write data into the stream

         * @param data data to write

         * @param maxSize max size of data ; split the file if we reach it

         * @throws IOException writing issue

         */

        public void writeStream(byte[] data, long maxSize) throws IOException {

            stream.write(data);

            // If the source is too slow, flushes the data. Using this method, We do not always have the "last flushes",

            // especially concerning old partitions.           

            // TODO If it's an issue, implements a time out thread.

            final long maxDelayFlush = 100;

            final long invokeTime = System.currentTimeMillis();

            if (invokeTime - lastInvoke > maxDelayFlush) {

                stream.flush();

            }

            lastInvoke = invokeTime;

            if (incTaille(data.length) >= maxSize) {

                split();

            }

        }

 

        /**

         * increment file size

         * @param amount what to add

         * @return the new size

         */

        private long incTaille(long amount) {

            size += amount;

            return size;

        }

 

        /**

         * Closes current file and open a new one

         * @throws IOException issue when opening file

         */

        private void split() throws IOException {

            closeStream();

            nbFile++;

            open();

            size = 0;

        }

 

        /**

         * flushes stream

         * @throws IOException I/O issue

         */

        public void flushStream() throws IOException {

            stream.flush();

        }

    }

 

    /** SUID. */

    private static final long serialVersionUID = 1L;

 

    // Shared fields

 

    /** Output hive table scheme */

    private final HCatSchema outputSchema;

 

    /** field delimiter */

    private final char delim;

 

    /** hdfs root path */

    private final String hdfsPath;

 

    /** Max file size */

    private final long maxSize;

 

    // Subtask fields

   

    /** filename prefix for a subtask, prevents conflicts with another subtask or a previous run */

    private transient String namePrefix;

 

    /** map of streams indexed per met partition */

    private transient Map<String, StreamingFile> streams;

 

    /** instant of the last periodic flush */

    private transient long lastFlushAll;

 

    /**

     * Builds a streamer.

     * @param outputSchema output record schema (without partition)

     * @param delim field delimiter

     * @param hdfsPath HDFS destination path

     * @param maxSize max size of a file (rolls the file if reached)

     */

    public HiveStreamOutput(HCatSchema outputSchema, char delim, String hdfsPath, long maxSize) {

        super();

        this.outputSchema = outputSchema;

        this.delim = delim;

        this.hdfsPath = hdfsPath;

        this.maxSize = maxSize;

    }

 

    /** {@inheritDoc} */

    @Override

    public void open(Configuration parameters) throws Exception { // NOPMD

        // Prefix is unique for a run and a subtask, to avoid conflicts

        namePrefix = "S" + getRuntimeContext().getIndexOfThisSubtask() + "_" + (new DateTime().getMillis()) + "_";

        streams = new HashMap<String, StreamingFile>();

    }

 

    /** {@inheritDoc} */

    @Override

    public void close() throws Exception { // NOPMD

        for (final StreamingFile file : streams.values()) {

            file.closeStream();

        }

    }

 

    /** {@inheritDoc} */

    @Override

    public void invoke(Tuple2<String, DefaultHCatRecord> value) throws Exception { // NOPMD

        final String partition = value.f0;

        final String record = HiveFileOutputFormat.getRecordLine(value.f1, outputSchema, delim);

        // Do we have an open data stream for this partition ?

        StreamingFile file = streams.get(partition);

        if (file == null) {

            file = new StreamingFile(hdfsPath + "/" + partition, namePrefix);

            streams.put(partition, file);

        }

        file.writeStream(record.getBytes(), maxSize);

 

        // Periodically flush all streams

        final long invoke = System.currentTimeMillis();

        final long flushPeriod = 10000;

        if (invoke - lastFlushAll > flushPeriod) {

            lastFlushAll = invoke;

            for (final StreamingFile stream : streams.values()) {

                stream.flushStream();

            }

        }