Hi,
I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer. How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated. Thank you, Saliya Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center |
Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote:
|
Hi Saliya,
You can use the input format from Hadoop in Flink by using readHadoopFile method. The method returns a dataset which of type is Tuple2<Key, Value>. Note that MapReduce equivalent transformation in Flink is composed of map, groupBy, and reduceGroup. > On Jan 20, 2016, at 3:04 PM, Suneel Marthi <[hidden email]> wrote: > > Guess u r looking for Flink's BinaryInputFormat to be able to read blocks of data from HDFS > > https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html > > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <[hidden email]> wrote: > Hi, > > I am trying to use Flink perform a parallel batch operation on a NxN matrix represented as a binary file. Each (i,j) element is stored as a Java Short value. In a typical MapReduce programming with Hadoop, each map task will read a block of rows of this matrix and perform computation on that block and emit result to the reducer. > > How is this done in Flink? I am new to Flink and couldn't find a binary reader so far. Any help is greatly appreciated. > > Thank you, > Saliya > > -- > Saliya Ekanayake > Ph.D. Candidate | Research Assistant > School of Informatics and Computing | Digital Science Center > Indiana University, Bloomington > Cell 812-391-4914 > http://saliya.org > Regards, Chiwan Park |
Thank you, I saw the readHadoopFile, but I was not sure how it can be used to the following, which is what I need. The logic of the code requires an entire row to operate on, so in our current implementation with P tasks, each of them will read a rectangular block of (N/P) x N from the matrix. Is this possible with readHadoopFile? Also, the file may not be in hdfs, so is it possible to refer to local disk in doing this? Thank you On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <[hidden email]> wrote: Hi Saliya, Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center |
With Cheers, On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <[hidden email]> wrote:
|
Thank you for the response on this, but I still have some doubt. Simply, the files is not in HDFS, it's in local storage. In Flink if I run the program with, say 5 parallel tasks, what I would like to do is to read a block of rows in each task as shown below. I looked at the simple CSV reader and was thinking to create a custom one like that, but I would need to know the task number to read the relevant block. Is this possible? Thank you, Saliya On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center |
There should be a env.readbinaryfile() IIRC, check that Sent from my iPhone
|
Hi Saliya, yes that is possible, however the requirements for reading a binary file from local fs are basically the same as for reading it from HDSF.However, are you sure that reading a file in parallel will be faster than reading it sequentially? At least for HDDs, IO-bound workloads with "random" reading patterns are usually much slower than sequential reads. 2016-01-24 19:10 GMT+01:00 Suneel Marthi <[hidden email]>:
|
Hi Fabian, Thank you for the information. So, is there a way I can get the task number within the InputFormat? That way I can use it to offset the block of rows. The file size is large to fit in a single process' memory, so the current setup in MPI and Hadoop use the rank (task number) info to memory map the corresponding block of rows. In our experiments, we found this approach to be the fastest because of the memory mapping rather buffered reads. Also, the file is replicated across nodes and the reading (mapping) happens only once. Thank you, Saliya On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center |
Hi Saliya, the number of parallel splits is controlled by the number of input splits returned by the InputFormat.createInputSplits() method. This method receives a parameter minNumSplits with is equal to the number of DataSource tasks.2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <[hidden email]>:
|
Hi Fabian, Thank you, I think I've a better picture of this now. I think if I set DataSource tasks (a config option I guess?) equal to input splits that would do as I expected. Yes, will keep it at the same place across nodes. Thank you, Saliya On Mon, Jan 25, 2016 at 10:59 AM, Fabian Hueske <[hidden email]> wrote:
Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center |
Hi, I am using a global window to collect some events. I use a trigger to fire the processing.
Is there any way to get the time of the event that has triggered the processing. I am asking this as the getMaxTime() field of the GlobalWindow returns MaxLong. The code skeleton is: stream .windowAll(GlobalWindows.create()) .trigger(
new MyTrigger()) .apply(
new AllWindowFunction<Tuple1<Long>, Tuple1<Long>, GlobalWindow>()
{
@Override
public
void
apply(GlobalWindow arg0, Iterable< Tuple1<Long>, > arg1, Collector<Tuple1<Long>> arg2)
throws Exception
{
// - get the even timestamp
} }) Dr. Radu Tudoran Research Engineer - Big Data Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail:
[hidden email] Mobile: +49 15209084330 Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use
of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the
sender by phone or email immediately and delete it! |
In reply to this post by Saliya Ekanayake
Re-Hi, I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether
an event has been received or not, via a trigger? The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while
having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window. Regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail:
[hidden email] Mobile: +49 15209084330 Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use
of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the
sender by phone or email immediately and delete it! |
Hi Radu, you can register processing and event time time triggers using the I hope this helps you to solve your problem. Cheers, On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote:
|
In reply to this post by Radu Tudoran
Hi Radu, If I’m not mistaken, then it’s not possible with the current Cheers, On Mon, Jan 25, 2016 at 8:36 PM, Radu Tudoran <[hidden email]> wrote:
|
In reply to this post by Till Rohrmann
For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers Some things I discovered along the way, particularly using processing time, which may be useful: - registering a time that's already passed will cause the timer callback to be called - when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though) On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote:
|
Hi, Thank you for sharing your experience and also to Till for the advice. What I would like to do is to be able to fire the window potentially multiple times, even if an event did not arrive. I will look more about how dealing with
the processing time could help in this Dr. Radu Tudoran Research Engineer - Big Data Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München E-mail:
[hidden email] Mobile: +49 15209084330 Telephone: +49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use
of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the
sender by phone or email immediately and delete it! From: Brian Chhun [mailto:[hidden email]]
For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers
were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers Some things I discovered along the way, particularly using processing time, which may be useful: - registering a time that's already passed will cause the timer callback to be called - when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though) On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote: Hi Radu, you can register processing and event time time triggers using the
I hope this helps you to solve your problem. Cheers, On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote: Re-Hi, I have another question regarding the triggering of the processing of a window. Can this be done
in some way at specific time intervals, independent of whether an event has been received or not, via a trigger? The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end
up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window. Regards, Dr. Radu Tudoran Research Engineer - Big Data Expert IT R&D Division HUAWEI TECHNOLOGIES Duesseldorf GmbH European Research Center Riesstrasse 25, 80992 München
E-mail:
[hidden email] Mobile:
<a href="tel:%2B49%2015209084330" target="_blank">+49 15209084330 Telephone:
<a href="tel:%2B49%20891588344173" target="_blank">+49 891588344173
HUAWEI TECHNOLOGIES Duesseldorf GmbH This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for
the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited.
If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! |
Hi Brian,
you are right about changing the behavior of windows when closing. Would this be a problem for you? Cheers, Aljoscha > On 26 Jan 2016, at 17:53, Radu Tudoran <[hidden email]> wrote: > > Hi, > > Thank you for sharing your experience and also to Till for the advice. > What I would like to do is to be able to fire the window potentially multiple times, even if an event did not arrive. I will look more about how dealing with the processing time could help in this > > Dr. Radu Tudoran > Research Engineer - Big Data Expert > IT R&D Division > > <image001.png> > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: [hidden email] > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! > > From: Brian Chhun [mailto:[hidden email]] > Sent: Tuesday, January 26, 2016 5:28 PM > To: [hidden email] > Subject: Re: continous time triger > > For what it's worth, we have a trigger that fires once a day for a recurring calculation. When an element comes in, we set the trigger context's processing time timer to the exact millisecond of the desired time. The predefined triggers were useful to look at to achieve this: https://github.com/apache/flink/tree/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers > > Some things I discovered along the way, particularly using processing time, which may be useful: > - registering a time that's already passed will cause the timer callback to be called > - when the system shuts down, the window is fired even though the trigger has not gone off (this sounds subject to change though) > > On Tue, Jan 26, 2016 at 3:47 AM, Till Rohrmann <[hidden email]> wrote: > Hi Radu, > > you can register processing and event time time triggers using the TriggerContext which is given to the onElement, onProcessingTime and onEventTime methods of Trigger. In case you register a processing time timer, the onProcessingTime method will be called once the system clock has passed the timer time. In case of an event time timer, the onEventTimemethod is called once a watermark has been received which has a higher watermark than the timer. > > I hope this helps you to solve your problem. > > Cheers, > Till > > > > On Mon, Jan 25, 2016 at 9:25 PM, Radu Tudoran <[hidden email]> wrote: > Re-Hi, > > I have another question regarding the triggering of the processing of a window. Can this be done in some way at specific time intervals, independent of whether an event has been received or not, via a trigger? > > The reason why I am considering a trigger rather than timeWindow(All) is that timeWindow will end up generating multiple windows and duplicating data, while having the option from the trigger to actually fire the processing at certain times, independent of when the events arrived) would enable to operate with a single window. > > Regards, > > Dr. Radu Tudoran > Research Engineer - Big Data Expert > IT R&D Division > > <image001.png> > HUAWEI TECHNOLOGIES Duesseldorf GmbH > European Research Center > Riesstrasse 25, 80992 München > > E-mail: [hidden email] > Mobile: +49 15209084330 > Telephone: +49 891588344173 > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN > This e-mail and its attachments contain confidential information from HUAWEI, which is intended only for the person or entity whose address is listed above. Any use of the information contained herein in any way (including, but not limited to, total or partial disclosure, reproduction, or dissemination) by persons other than the intended recipient(s) is prohibited. If you receive this e-mail in error, please notify the sender by phone or email immediately and delete it! |
Hi Aljoscha, No problem with the change. I think it's more what a user would expect as well. On Wed, Jan 27, 2016 at 3:27 AM, Aljoscha Krettek <[hidden email]> wrote: Hi Brian, |
Free forum by Nabble | Edit this page |