Read once input data?

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Read once input data?

Saliya Ekanayake
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Fabian Hueske-2
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Fabian Hueske-2
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
Thank you, Fabian.

Any chance you might have an example on how to define a data flow with Flink?



On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Fabian Hueske-2

2016-02-15 22:03 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, Fabian.

Any chance you might have an example on how to define a data flow with Flink?



On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
Thanks, I'll check this.

Saliya

On Mon, Feb 15, 2016 at 4:08 PM, Fabian Hueske <[hidden email]> wrote:

2016-02-15 22:03 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, Fabian.

Any chance you might have an example on how to define a data flow with Flink?



On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Flavio Pompermaier
I also have a couple of use cases where the pin data sets in memory feature would help a lot ;)

On Mon, Feb 15, 2016 at 10:18 PM, Saliya Ekanayake <[hidden email]> wrote:
Thanks, I'll check this.

Saliya

On Mon, Feb 15, 2016 at 4:08 PM, Fabian Hueske <[hidden email]> wrote:

2016-02-15 22:03 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, Fabian.

Any chance you might have an example on how to define a data flow with Flink?



On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org


Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
In reply to this post by Fabian Hueske-2
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Fabian Hueske-2
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Fabian Hueske-2
You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <[hidden email]>:
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
Thank you, yes, this makes sense. The broadcasted data in my case would a large array of 3D coordinates, 

On a side note, how can I take the output from a reduce function? I can see methods to write it to a given output, but is it possible to retrieve the reduced result back to the program - like a double value representing the average in the previous example.


On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <[hidden email]> wrote:
You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <[hidden email]>:
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Fabian Hueske-2
Broadcasted DataSets are stored on the JVM heap of each task manager (but shared among multiple slots on the same TM), hence the size restriction.

There are two ways to retrieve a DataSet (such as the result of a reduce).
1) if you want to fetch the result into your client program use DataSet.collect(). This immediately triggers an execution and fetches the result from the cluster.
2) if you want to use the result for a computation in the cluster use broadcast sets as described above.

2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, yes, this makes sense. The broadcasted data in my case would a large array of 3D coordinates, 

On a side note, how can I take the output from a reduce function? I can see methods to write it to a given output, but is it possible to retrieve the reduced result back to the program - like a double value representing the average in the previous example.


On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <[hidden email]> wrote:
You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <[hidden email]>:
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org

Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
Thank you. I'll check this

On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <[hidden email]> wrote:
Broadcasted DataSets are stored on the JVM heap of each task manager (but shared among multiple slots on the same TM), hence the size restriction.

There are two ways to retrieve a DataSet (such as the result of a reduce).
1) if you want to fetch the result into your client program use DataSet.collect(). This immediately triggers an execution and fetches the result from the cluster.
2) if you want to use the result for a computation in the cluster use broadcast sets as described above.

2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, yes, this makes sense. The broadcasted data in my case would a large array of 3D coordinates, 

On a side note, how can I take the output from a reduce function? I can see methods to write it to a given output, but is it possible to retrieve the reduced result back to the program - like a double value representing the average in the previous example.


On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <[hidden email]> wrote:
You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <[hidden email]>:
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Flavio Pompermaier
In my use case I though to persist the dataset to reuse on Tachyon in order to speed up its reading..do you think it could help?

On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you. I'll check this

On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <[hidden email]> wrote:
Broadcasted DataSets are stored on the JVM heap of each task manager (but shared among multiple slots on the same TM), hence the size restriction.

There are two ways to retrieve a DataSet (such as the result of a reduce).
1) if you want to fetch the result into your client program use DataSet.collect(). This immediately triggers an execution and fetches the result from the cluster.
2) if you want to use the result for a computation in the cluster use broadcast sets as described above.

2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, yes, this makes sense. The broadcasted data in my case would a large array of 3D coordinates, 

On a side note, how can I take the output from a reduce function? I can see methods to write it to a given output, but is it possible to retrieve the reduced result back to the program - like a double value representing the average in the previous example.


On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <[hidden email]> wrote:
You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <[hidden email]>:
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org
Reply | Threaded
Open this post in threaded view
|

Re: Read once input data?

Saliya Ekanayake
I'll be interested to hear more about this when you implement it.

Thank you

On Wed, Feb 17, 2016 at 4:44 AM, Flavio Pompermaier <[hidden email]> wrote:
In my use case I though to persist the dataset to reuse on Tachyon in order to speed up its reading..do you think it could help?


On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake <[hidden email]> wrote:
Thank you. I'll check this

On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <[hidden email]> wrote:
Broadcasted DataSets are stored on the JVM heap of each task manager (but shared among multiple slots on the same TM), hence the size restriction.

There are two ways to retrieve a DataSet (such as the result of a reduce).
1) if you want to fetch the result into your client program use DataSet.collect(). This immediately triggers an execution and fetches the result from the cluster.
2) if you want to use the result for a computation in the cluster use broadcast sets as described above.

2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Thank you, yes, this makes sense. The broadcasted data in my case would a large array of 3D coordinates, 

On a side note, how can I take the output from a reduce function? I can see methods to write it to a given output, but is it possible to retrieve the reduced result back to the program - like a double value representing the average in the previous example.


On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <[hidden email]> wrote:
You can use so-called BroadcastSets to send any sufficiently small DataSet (such as a computed average) to any other function and use it there.
However, in your case you'll end up with a data flow that branches (at the source) and merges again (when the average is send to the second map).
Such patterns can cause deadlocks and can therefore not be pipelined which means that the data before the branch is written to disk and read again.
In your case it might be even better to read the data twice instead of reading, writing, and reading it.

Fabian

2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <[hidden email]>:
I looked at the samples and I think what you meant is clear, but I didn't find a solution for my need. In my case, I want to use the result from first map operation before I can apply the second map on the same data set. For simplicity, let's say I've a bunch of short values represented as my data set. Then I need to find their average, so I use a map and reduce. Then I want to map these short values with another function, but it needs that average computed in the beginning to work correctly.

Is this possible without doing multiple reads of the input data to create the same dataset?

Thank you,
saliya

On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <[hidden email]> wrote:
Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

I've a quick follow-up question on what you suggested. When streaming the same data through different maps, were you implying that everything goes as single job in Flink, so data read happens only once? 

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <[hidden email]> wrote:
It is not possible to "pin" data sets in memory, yet.
However, you can stream the same data set through two different mappers at the same time.

For instance you can have a job like:

                 /---> Map 1 --> SInk1
Source --<
                 \---> Map 2 --> SInk2

and execute it at once.
For that you define you data flow and call execute once after all sinks have been created.

Best, Fabian

2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Fabian,

count() was just an example. What I would like to do is say run two map operations on the dataset (ds). Each map will have it's own reduction, so is there a way to avoid creating two jobs for such scenario?

The reason is, reading these binary matrices are expensive. In our current MPI implementation, I am using memory maps for faster loading and reuse.

Thank you,
Saliya

On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <[hidden email]> wrote:
Hi,

it looks like you are executing two distinct Flink jobs.
DataSet.count() triggers the execution of a new job. If you have an execute() call in your program, this will lead to two Flink jobs being executed.
It is not possible to share state among these jobs.

Maybe you should add a custom count implementation (using a ReduceFunction) which is executed in the same program as the other ReduceFunction.

Best, Fabian



2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <[hidden email]>:
Hi,

I see that an InputFormat's open() and nextRecord() methods get called for each terminal operation on a given dataset using that particular InputFormat. Is it possible to avoid this - possibly using some caching technique in Flink?

For example, I've some code like below and I see for both the last two statements (reduce() and count()) the above methods in the input format get called. Btw. this is a custom input format I wrote to represent a binary matrix stored as Short values.

ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
DataSet<Short[]> ds = env.createInput(smif, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
MapOperator<Short[], DoubleStatistics> op = ds.map(...)
op.reduce(...)
op.count(...)

Thank you,
Saliya
--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org




--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell <a href="tel:812-391-4914" value="+18123914914" target="_blank">812-391-4914
http://saliya.org



--
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org