Reading Binary Data (Matrix) with Flink

classic Classic list List threaded Threaded
19 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading Binary Data (Matrix) with Flink

Saliya Ekanayake
Hi,

I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.

How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.

Thank you,
Saliya

--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Suneel Marthi
Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS


On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
Hi,

I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.

How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.

Thank you,
Saliya

--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Chiwan Park-2
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park

Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Saliya Ekanayake
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Till Rohrmann

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Saliya Ekanayake
Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible?

Inline image 2

Thank you,
Saliya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Suneel Marthi-2
There should be a env.readbinaryfile() IIRC, check that

Sent from my iPhone

On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <[hidden email]> wrote:

Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible?

<image.png>

Thank you,
Saliya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Fabian Hueske-2
Hi Saliya,

yes that is possible, however the requirements for reading a binary file from local fs are basically the same as for reading it from HDSF.
In order to be able to start reading different sections of a file in parallel, you need to know the different starting positions. This can be done by either having fixed offsets for blocks or adding some meta information for the block start positions. InputFormats can divide the work of reading a file by generating multiple input splits. Each input split defines the file, the start offset and the length to read.

However, are you sure that reading a file in parallel will be faster than reading it sequentially?
At least for HDDs, IO-bound workloads with "random" reading patterns are usually much slower than sequential reads.

Cheers, Fabian

2016-01-24 19:10 GMT+01:00 Suneel Marthi <[hidden email]>:
There should be a env.readbinaryfile() IIRC, check that

Sent from my iPhone

On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <[hidden email]> wrote:

Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible?

<image.png>

Thank you,
Saliya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Saliya Ekanayake
Hi Fabian,

Thank you for the information.

So, is there a way I can get the task number within the InputFormat? That way I can use it to offset the block of rows. 

The file size is large to fit in a single process' memory, so the current setup in MPI and Hadoop use the rank (task number) info to memory map the corresponding block of rows. In our experiments, we found this approach to be the fastest because of the memory mapping rather buffered reads. Also, the file is replicated across nodes and the reading (mapping) happens only once. 

Thank you,
Saliya

On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <[hidden email]> wrote:
Hi Saliya,

yes that is possible, however the requirements for reading a binary file from local fs are basically the same as for reading it from HDSF.
In order to be able to start reading different sections of a file in parallel, you need to know the different starting positions. This can be done by either having fixed offsets for blocks or adding some meta information for the block start positions. InputFormats can divide the work of reading a file by generating multiple input splits. Each input split defines the file, the start offset and the length to read.

However, are you sure that reading a file in parallel will be faster than reading it sequentially?
At least for HDDs, IO-bound workloads with "random" reading patterns are usually much slower than sequential reads.

Cheers, Fabian

2016-01-24 19:10 GMT+01:00 Suneel Marthi <[hidden email]>:
There should be a env.readbinaryfile() IIRC, check that

Sent from my iPhone

On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <[hidden email]> wrote:

Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible?

<image.png>

Thank you,
Saliya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Fabian Hueske-2
Hi Saliya,

the number of parallel splits is controlled by the number of input splits returned by the InputFormat.createInputSplits() method. This method receives a parameter minNumSplits with is equal to the number of DataSource tasks.

Flink handles input splits a bit different from Hadoop. In Hadoop, each input split corresponds to one map task. In Flink you have a fixed number of DataSource tasks and input splits are lazily distributed to source tasks. If you have more splits than tasks, a data source requests a new split when it is done with its last split until all splits are assigned. If your createInputSplits method returns less splits than minNumSplits, some source tasks won't receive a split.

If you read files from a local FS in a distributed (multi-node) setup, you have to be careful. Each node must have an exact copy of the data at exactly the same location. Otherwise, it won't work.

Best, Fabian

2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi Fabian,

Thank you for the information.

So, is there a way I can get the task number within the InputFormat? That way I can use it to offset the block of rows. 

The file size is large to fit in a single process' memory, so the current setup in MPI and Hadoop use the rank (task number) info to memory map the corresponding block of rows. In our experiments, we found this approach to be the fastest because of the memory mapping rather buffered reads. Also, the file is replicated across nodes and the reading (mapping) happens only once. 

Thank you,
Saliya

On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <[hidden email]> wrote:
Hi Saliya,

yes that is possible, however the requirements for reading a binary file from local fs are basically the same as for reading it from HDSF.
In order to be able to start reading different sections of a file in parallel, you need to know the different starting positions. This can be done by either having fixed offsets for blocks or adding some meta information for the block start positions. InputFormats can divide the work of reading a file by generating multiple input splits. Each input split defines the file, the start offset and the length to read.

However, are you sure that reading a file in parallel will be faster than reading it sequentially?
At least for HDDs, IO-bound workloads with "random" reading patterns are usually much slower than sequential reads.

Cheers, Fabian

2016-01-24 19:10 GMT+01:00 Suneel Marthi <[hidden email]>:
There should be a env.readbinaryfile() IIRC, check that

Sent from my iPhone

On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <[hidden email]> wrote:

Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible?

<image.png>

Thank you,
Saliya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Reading Binary Data (Matrix) with Flink

Saliya Ekanayake
Hi Fabian,

Thank you, I think I've a better picture of this now. I think if I set DataSource tasks (a config option I guess?) equal to input splits that would do as I expected.

Yes, will  keep it at the same place across nodes.

Thank you,
Saliya

On Mon, Jan 25, 2016 at 10:59 AM, Fabian Hueske <[hidden email]> wrote:
Hi Saliya,

the number of parallel splits is controlled by the number of input splits returned by the InputFormat.createInputSplits() method. This method receives a parameter minNumSplits with is equal to the number of DataSource tasks.

Flink handles input splits a bit different from Hadoop. In Hadoop, each input split corresponds to one map task. In Flink you have a fixed number of DataSource tasks and input splits are lazily distributed to source tasks. If you have more splits than tasks, a data source requests a new split when it is done with its last split until all splits are assigned. If your createInputSplits method returns less splits than minNumSplits, some source tasks won't receive a split.

If you read files from a local FS in a distributed (multi-node) setup, you have to be careful. Each node must have an exact copy of the data at exactly the same location. Otherwise, it won't work.

Best, Fabian

2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi Fabian,

Thank you for the information.

So, is there a way I can get the task number within the InputFormat? That way I can use it to offset the block of rows. 

The file size is large to fit in a single process' memory, so the current setup in MPI and Hadoop use the rank (task number) info to memory map the corresponding block of rows. In our experiments, we found this approach to be the fastest because of the memory mapping rather buffered reads. Also, the file is replicated across nodes and the reading (mapping) happens only once. 

Thank you,
Saliya

On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <[hidden email]> wrote:
Hi Saliya,

yes that is possible, however the requirements for reading a binary file from local fs are basically the same as for reading it from HDSF.
In order to be able to start reading different sections of a file in parallel, you need to know the different starting positions. This can be done by either having fixed offsets for blocks or adding some meta information for the block start positions. InputFormats can divide the work of reading a file by generating multiple input splits. Each input split defines the file, the start offset and the length to read.

However, are you sure that reading a file in parallel will be faster than reading it sequentially?
At least for HDDs, IO-bound workloads with "random" reading patterns are usually much slower than sequential reads.

Cheers, Fabian

2016-01-24 19:10 GMT+01:00 Suneel Marthi <[hidden email]>:
There should be a env.readbinaryfile() IIRC, check that

Sent from my iPhone

On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <[hidden email]> wrote:

Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible?

<image.png>

Thank you,
Saliya

On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:

With readHadoopFile you can use all of Hadoop’s FileInputFormats and thus you can also do everything with Flink, what you can do with Hadoop. Simply take the same Hadoop FileInputFormat which you would take for your MapReduce job.

Cheers,
Till


On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this?

Thank you

On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote:
Hi Saliya,

You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup.

> On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote:
>
> Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>
> On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
> Hi,
>
> I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer.
>
> How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated.
>
> Thank you,
> Saliya
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
> http://saliya.org
>

Regards,
Chiwan Park




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

maxtime / watermark for GlobaWindow

Radu Tudoran

Hi,

 

I am using a global window to collect some events. I use a trigger to fire the processing.

Is there any way to get the time of the event that has triggered the processing.

 

I am asking this as the getMaxTime() field of the GlobalWindow returns MaxLong.

 

 

The code skeleton is:

 

stream

       .windowAll(GlobalWindows.create())

       .trigger( new MyTrigger())

       .apply( new AllWindowFunction<Tuple1<Long>, Tuple1<Long>, GlobalWindow>() {

                                  @Override

                                  public void apply(GlobalWindow arg0,

                                                Iterable< Tuple1<Long>, > arg1,

                                                Collector<Tuple1<Long>> arg2) throws Exception {

                                        

// - get the even timestamp

                                        

 

 

                                  }

                           })

 

 

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

Reply | Threaded
Open this post in threaded view
|

continous time triger

Radu Tudoran
In reply to this post by Saliya Ekanayake

Re-Hi,

 

I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether  an event has been received or not, via a trigger?

 

The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window.

 

Regards,

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

Reply | Threaded
Open this post in threaded view
|

Re: continous time triger

Till Rohrmann

Hi Radu,

you can register processing and event time time triggers using the TriggerContext which is given to the onElement, onProcessingTime and onEventTime methods of Trigger. In case you register a processing time timer, the onProcessingTime method will be called once the system clock has passed the timer time. In case of an event time timer, the onEventTime method is called once a watermark has been received which has a higher watermark than the timer.

I hope this helps you to solve your problem.

Cheers,
Till


On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote:

Re-Hi,

 

I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether  an event has been received or not, via a trigger?

 

The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window.

 

Regards,

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank">+49 15209084330

Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank">+49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 


Reply | Threaded
Open this post in threaded view
|

Re: maxtime / watermark for GlobaWindow

Till Rohrmann
In reply to this post by Radu Tudoran

Hi Radu,

If I’m not mistaken, then it’s not possible with the current GlobalWindow implementation. However, you could extend the GlobalWindow which adds a new field into which the timestamp of the triggering element is stored. This field can then be read from within the WindowFunction to retrieve the latest timestamp.

Cheers,
Till


On Mon, Jan 25, 2016 at 8:36 PM, Radu Tudoran <[hidden email]> wrote:

Hi,

 

I am using a global window to collect some events. I use a trigger to fire the processing.

Is there any way to get the time of the event that has triggered the processing.

 

I am asking this as the getMaxTime() field of the GlobalWindow returns MaxLong.

 

 

The code skeleton is:

 

stream

       .windowAll(GlobalWindows.create())

       .trigger( new MyTrigger())

       .apply( new AllWindowFunction<Tuple1<Long>, Tuple1<Long>, GlobalWindow>() {

                                  @Override

                                  public void apply(GlobalWindow arg0,

                                                Iterable< Tuple1<Long>, > arg1,

                                                Collector<Tuple1<Long>> arg2) throws Exception {

                                        

// - get the even timestamp

                                        

 

 

                                  }

                           })

 

 

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank">+49 15209084330

Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank">+49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 


Reply | Threaded
Open this post in threaded view
|

Re: continous time triger

Brian Chhun
In reply to this post by Till Rohrmann
For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers

Some things I discovered along the way, particularly using processing time, which may be useful:
- registering a time that's already passed will cause the timer callback to be called
- when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though)

On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote:

Hi Radu,

you can register processing and event time time triggers using the TriggerContext which is given to the onElement, onProcessingTime and onEventTime methods of Trigger. In case you register a processing time timer, the onProcessingTime method will be called once the system clock has passed the timer time. In case of an event time timer, the onEventTime method is called once a watermark has been received which has a higher watermark than the timer.

I hope this helps you to solve your problem.

Cheers,
Till


On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote:

Re-Hi,

 

I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether  an event has been received or not, via a trigger?

 

The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window.

 

Regards,

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330" target="_blank">+49 15209084330

Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173" target="_blank">+49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 



Reply | Threaded
Open this post in threaded view
|

RE: continous time triger

Radu Tudoran

Hi,

 

Thank you for sharing your experience and also to Till for the advice.

What I would like to do is to be able to fire the window potentially multiple times, even if an event did not arrive. I will look more about how dealing with the processing time could help in this

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: +49 15209084330

Telephone: +49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

From: Brian Chhun [mailto:[hidden email]]
Sent: Tuesday, January 26, 2016 5:28 PM
To: [hidden email]
Subject: Re: continous time triger

 

For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers

 

Some things I discovered along the way, particularly using processing time, which may be useful:

- registering a time that's already passed will cause the timer callback to be called

- when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though)

 

On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote:

Hi Radu,

you can register processing and event time time triggers using the TriggerContext which is given to the onElement, onProcessingTime and onEventTime methods of Trigger. In case you register a processing time timer, the onProcessingTime method will be called once the system clock has passed the timer time. In case of an event time timer, the onEventTime method is called once a watermark has been received which has a higher watermark than the timer.

I hope this helps you to solve your problem.

Cheers,
Till

 

On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote:

Re-Hi,

 

I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether  an event has been received or not, via a trigger?

 

The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window.

 

Regards,

 

Dr. Radu Tudoran

Research Engineer - Big Data Expert

IT R&D Division

 

cid:image007.jpg@01CD52EB.AD060EE0

HUAWEI TECHNOLOGIES Duesseldorf GmbH

European Research Center

Riesstrasse 25, 80992 München

 

E-mail: [hidden email]

Mobile: <a href="tel:%2B49%2015209084330" target="_blank">+49 15209084330

Telephone: <a href="tel:%2B49%20891588344173" target="_blank">+49 891588344173

 

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany,
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: continous time triger

Aljoscha Krettek
Hi Brian,
you are right about changing the behavior of windows when closing. Would this be a problem for you?

Cheers,
Aljoscha

> On 26 Jan 2016, at 17:53, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>  
> Thank you for sharing your experience and also to Till for the advice.
> What I would like to do is to be able to fire the window potentially multiple times, even if an event did not arrive. I will look more about how dealing with the processing time could help in this
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>  
> From: Brian Chhun [mailto:[hidden email]]
> Sent: Tuesday, January 26, 2016 5:28 PM
> To: [hidden email]
> Subject: Re: continous time triger
>  
> For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers
>  
> Some things I discovered along the way, particularly using processing time, which may be useful:
> - registering a time that's already passed will cause the timer callback to be called
> - when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though)
>  
> On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote:
> Hi Radu,
>
> you can register processing and event time time triggers using the TriggerContext which is given to the onElement, onProcessingTime and onEventTime methods of Trigger. In case you register a processing time timer, the onProcessingTime method will be called once the system clock has passed the timer time. In case of an event time timer, the onEventTimemethod is called once a watermark has been received which has a higher watermark than the timer.
>
> I hope this helps you to solve your problem.
>
> Cheers,
> Till
>
> ​
>  
> On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote:
> Re-Hi,
>  
> I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether  an event has been received or not, via a trigger?
>  
> The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window.
>  
> Regards,
>  
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>  
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>  
> E-mail: [hidden email]
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>  
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!

Reply | Threaded
Open this post in threaded view
|

Re: continous time triger

Brian Chhun
Hi Aljoscha,

No problem with the change. I think it's more what a user would expect as well.

On Wed, Jan 27, 2016 at 3:27 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi Brian,
you are right about changing the behavior of windows when closing. Would this be a problem for you?

Cheers,
Aljoscha
> On 26 Jan 2016, at 17:53, Radu Tudoran <[hidden email]> wrote:
>
> Hi,
>
> Thank you for sharing your experience and also to Till for the advice.
> What I would like to do is to be able to fire the window potentially multiple times, even if an event did not arrive. I will look more about how dealing with the processing time could help in this
>
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330">+49 15209084330
> Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173">+49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!
>
> From: Brian Chhun [mailto:[hidden email]]
> Sent: Tuesday, January 26, 2016 5:28 PM
> To: [hidden email]
> Subject: Re: continous time triger
>
> For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers
>
> Some things I discovered along the way, particularly using processing time, which may be useful:
> - registering a time that's already passed will cause the timer callback to be called
> - when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though)
>
> On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote:
> Hi Radu,
>
> you can register processing and event time time triggers using the TriggerContext which is given to the onElement, onProcessingTime and onEventTime methods of Trigger. In case you register a processing time timer, the onProcessingTime method will be called once the system clock has passed the timer time. In case of an event time timer, the onEventTimemethod is called once a watermark has been received which has a higher watermark than the timer.
>
> I hope this helps you to solve your problem.
>
> Cheers,
> Till
>
> ​
>
> On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote:
> Re-Hi,
>
> I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether  an event has been received or not, via a trigger?
>
> The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window.
>
> Regards,
>
> Dr. Radu Tudoran
> Research Engineer - Big Data Expert
> IT R&D Division
>
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
>
> E-mail: [hidden email]
> Mobile: <a href="tel:%2B49%2015209084330" value="+4915209084330">+49 15209084330
> Telephone: <a href="tel:%2B49%20891588344173" value="+49891588344173">+49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
> This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it!