Hi,
I have been running some experiments on large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management. What I want to do is, invoking my graph generator, using ipc/tcp protocols and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples. Is it possible to use streaming environment to create the data in parallel and switch to dataset api? Thanks in advance! Best Kaan |
Hi Kaan, afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood). However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster? It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3. Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job. On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote: Hi, -- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Thanks for the answer! Also thanks for raising some concerns about my question.
Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours. Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.
My issue is actually not storage related, I am trying to see how can I reduce the IO time. One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.
I am not familiar with the analysis jobs. I will look into it. Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! Best Kaan
|
Hi Kaan, sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution. How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job? You don't need to modify much on the analysis job and each source can be independently read. WDYT? On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Yes, that sounds like a great idea and actually that's what I am trying to do.
Before trying to open the sockets on the slave nodes, first I have opened just one socket at master node, and I also run the generator with one node as well. I was able to read the graph, and the run my algorithm without any problems. This was a test run to see whatever I can do it. After, I have opened bunch of sockets on my generators, now I am trying to configure Flink to read from those sockets. However, I am having problems while trying to assign each task manager to a separate socket. I am assuming my problems are related to network binding. In my configuration file, jobmanager.rpc.address is set but I have not done similar configurations for slave nodes. Am I on the right track, or is there an easier way to handle this? I think my point is how to do `read from each of these sockets with a separate source` part. Thanks again Best Kaan
|
Hm, I confused sockets to work the other way around (so pulling like URLInputStream instead of listening). I'd go by providing the data on a port on each generator node. And then read from that in multiple sources. I think the best solution is to implement a custom InputFormat and then use readInput. You could implement a subclass of GenericInputFormat. You might even use IteratorInputFormat like this: private static class URLInputIterator implements Iterator<Tuple2<Long, Long>>, Serializable { On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Kaan, seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would be to use a ZMQ Java binding to access it [1]. But of course, it's much more complicated to write an iterator logic for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the socket and you can just read as much as possible. On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Kaan, not entirely sure I understand your solution. I gathered that you create a dataset of TCP addresses and then use flatMap to fetch and output the data? If so, then I think it's a good solution for batch processing (DataSet). It doesn't work in DataStream because it doesn't play well with checkpointing, but you seem to be interested only in batch. It's also not the first time, I have seen this pattern being used in batch. In general, if it works and is fast enough, it's always a good solution ;). No need to make it more complicated if you can solve it with simpler means and you can maintain it more easily. Best, Arvid On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Hi Kaan, explicitly mapping to physical nodes is currently not supported and would need some workarounds. I have readded user mailing list (please always also include it in response); maybe someone can help you with that. Best, Arvid On Thu, Apr 30, 2020 at 10:12 AM Kaan Sancak <[hidden email]> wrote:
-- Arvid Heise | Senior Java Developer Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbHRegistered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng |
Free forum by Nabble | Edit this page |