Reading from sockets using dataset api

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Reading from sockets using dataset api

kaansancak
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan
Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

Arvid Heise-3
Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

kaansancak
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

Arvid Heise-3
Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

kaansancak
Yes, that sounds like a great idea and actually that's what I am trying to do.

 Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

Before trying to open the sockets on the slave nodes, first I have opened just one socket at master node, and I also run the generator with one node as well. I was able to read the graph, and the run my algorithm without any problems. This was a test run to see whatever I can do it.

After, I have opened bunch of sockets on my generators, now I am trying to configure Flink to read from those sockets. However, I am having problems while trying to assign each task manager to a separate socket. I am assuming my problems are related to network binding. In my configuration file,  jobmanager.rpc.address is set but I have not done similar configurations for slave nodes.

Am I on the right track, or is there an easier way to handle this?

I think my point is how to do `read from each of these sockets with a separate source` part.

Thanks again

Best
Kaan
 


On Apr 24, 2020, at 3:11 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   

Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

Arvid Heise-3
Hm, I confused sockets to work the other way around (so pulling like URLInputStream instead of listening). I'd go by providing the data on a port on each generator node. And then read from that in multiple sources.

I think the best solution is to implement a custom InputFormat and then use readInput. You could implement a subclass of GenericInputFormat. You might even use IteratorInputFormat like this:

private static class URLInputIterator implements Iterator<Tuple2<Long, Long>>, Serializable {
private final URL url;
private Iterator<Tuple2<Long, Long>> inner;

private URLInputIterator(URL url) {
this.url = url;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
InputStream inputStream = url.openStream();
inner = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.map(line -> {
String[] parts = line.split(";");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
})
.iterator();
}

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public Tuple2<Long, Long> next() {
return inner.next();
}
}

env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), Types.TUPLE(Types.LONG, Types.LONG));



On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <[hidden email]> wrote:
Yes, that sounds like a great idea and actually that's what I am trying to do.

 Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

Before trying to open the sockets on the slave nodes, first I have opened just one socket at master node, and I also run the generator with one node as well. I was able to read the graph, and the run my algorithm without any problems. This was a test run to see whatever I can do it.

After, I have opened bunch of sockets on my generators, now I am trying to configure Flink to read from those sockets. However, I am having problems while trying to assign each task manager to a separate socket. I am assuming my problems are related to network binding. In my configuration file,  jobmanager.rpc.address is set but I have not done similar configurations for slave nodes.

Am I on the right track, or is there an easier way to handle this?

I think my point is how to do `read from each of these sockets with a separate source` part.

Thanks again

Best
Kaan
 


On Apr 24, 2020, at 3:11 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

Arvid Heise-3
Hi Kaan,

seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would be to use a ZMQ Java binding to access it [1].

But of course, it's much more complicated to write an iterator logic for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the socket and you can just read as much as possible.


On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <[hidden email]> wrote:
Hi Arvid,

I am sorry for the late response. I had some deadlines, but I am back to work now.
I have been trying to implement what we have talked. But I am having problems on the implementation.
I have been using ZMQ to open sockets, because that is inheritenly supported in my graph generator. But, I couldn’t make the connection using input streams.
Do you have any specific examples, where I can look and have a better idea on how to implement this?

Best
Kaan

On Apr 24, 2020, at 4:37 AM, Arvid Heise <[hidden email]> wrote:

Hm, I confused sockets to work the other way around (so pulling like URLInputStream instead of listening). I'd go by providing the data on a port on each generator node. And then read from that in multiple sources.

I think the best solution is to implement a custom InputFormat and then use readInput. You could implement a subclass of GenericInputFormat. You might even use IteratorInputFormat like this:

private static class URLInputIterator implements Iterator<Tuple2<Long, Long>>, Serializable {
private final URL url;
private Iterator<Tuple2<Long, Long>> inner;

private URLInputIterator(URL url) {
this.url = url;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
InputStream inputStream = url.openStream();
inner = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.map(line -> {
String[] parts = line.split(";");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
})
.iterator();
}

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public Tuple2<Long, Long> next() {
return inner.next();
}
}

env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), Types.TUPLE(Types.LONG, Types.LONG));



On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <[hidden email]> wrote:
Yes, that sounds like a great idea and actually that's what I am trying to do.

 Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

Before trying to open the sockets on the slave nodes, first I have opened just one socket at master node, and I also run the generator with one node as well. I was able to read the graph, and the run my algorithm without any problems. This was a test run to see whatever I can do it.

After, I have opened bunch of sockets on my generators, now I am trying to configure Flink to read from those sockets. However, I am having problems while trying to assign each task manager to a separate socket. I am assuming my problems are related to network binding. In my configuration file,  jobmanager.rpc.address is set but I have not done similar configurations for slave nodes.

Am I on the right track, or is there an easier way to handle this?

I think my point is how to do `read from each of these sockets with a separate source` part.

Thanks again

Best
Kaan
 


On Apr 24, 2020, at 3:11 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

Arvid Heise-3
Hi Kaan,

not entirely sure I understand your solution. I gathered that you create a dataset of TCP addresses and then use flatMap to fetch and output the data?

If so, then I think it's a good solution for batch processing (DataSet). It doesn't work in DataStream because it doesn't play well with checkpointing, but you seem to be interested only in batch. It's also not the first time, I have seen this pattern being used in batch.

In general, if it works and is fast enough, it's always a good solution ;). No need to make it more complicated if you can solve it with simpler means and you can maintain it more easily.

Best,

Arvid

On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak <[hidden email]> wrote:
Hi Arvid,

I have implemented a zmq listener class without extending any class of Flink.
The listener has a constructor with the port number.

Then in the execution I have created a dateset of string which has the port numbers.
Then I used a flattop function, which returned Tuple2<Long, Long>. I opened the tcp sockets using localhost, so matching was quite easy. 

This seemed to work for me. What do you think about this implementation. Do you see any drawback?

Best
Kaan

On Apr 29, 2020, at 7:40 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would be to use a ZMQ Java binding to access it [1].

But of course, it's much more complicated to write an iterator logic for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the socket and you can just read as much as possible.


On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <[hidden email]> wrote:
Hi Arvid,

I am sorry for the late response. I had some deadlines, but I am back to work now.
I have been trying to implement what we have talked. But I am having problems on the implementation.
I have been using ZMQ to open sockets, because that is inheritenly supported in my graph generator. But, I couldn’t make the connection using input streams.
Do you have any specific examples, where I can look and have a better idea on how to implement this?

Best
Kaan

On Apr 24, 2020, at 4:37 AM, Arvid Heise <[hidden email]> wrote:

Hm, I confused sockets to work the other way around (so pulling like URLInputStream instead of listening). I'd go by providing the data on a port on each generator node. And then read from that in multiple sources.

I think the best solution is to implement a custom InputFormat and then use readInput. You could implement a subclass of GenericInputFormat. You might even use IteratorInputFormat like this:

private static class URLInputIterator implements Iterator<Tuple2<Long, Long>>, Serializable {
private final URL url;
private Iterator<Tuple2<Long, Long>> inner;

private URLInputIterator(URL url) {
this.url = url;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
InputStream inputStream = url.openStream();
inner = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.map(line -> {
String[] parts = line.split(";");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
})
.iterator();
}

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public Tuple2<Long, Long> next() {
return inner.next();
}
}

env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), Types.TUPLE(Types.LONG, Types.LONG));



On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <[hidden email]> wrote:
Yes, that sounds like a great idea and actually that's what I am trying to do.

 Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

Before trying to open the sockets on the slave nodes, first I have opened just one socket at master node, and I also run the generator with one node as well. I was able to read the graph, and the run my algorithm without any problems. This was a test run to see whatever I can do it.

After, I have opened bunch of sockets on my generators, now I am trying to configure Flink to read from those sockets. However, I am having problems while trying to assign each task manager to a separate socket. I am assuming my problems are related to network binding. In my configuration file,  jobmanager.rpc.address is set but I have not done similar configurations for slave nodes.

Am I on the right track, or is there an easier way to handle this?

I think my point is how to do `read from each of these sockets with a separate source` part.

Thanks again

Best
Kaan
 


On Apr 24, 2020, at 3:11 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   
Reply | Threaded
Open this post in threaded view
|

Re: Reading from sockets using dataset api

Arvid Heise-3
Hi Kaan,

explicitly mapping to physical nodes is currently not supported and would need some workarounds. I have readded user mailing list (please always also include it in response); maybe someone can help you with that.

Best,

Arvid

On Thu, Apr 30, 2020 at 10:12 AM Kaan Sancak <[hidden email]> wrote:
One quick question tho, on each generator node I am opening 24 sockets (number of cores that I have). Is there a way to guarantee that while doing the map function, each of the slave nodes distributes this 24 socket ports between its task slots(each slave also have 24 slave),
Sorry, I have asked a lot questions.

Stay safe!
Kaan 

On Thu, Apr 30, 2020 at 3:06 AM Kaan Sancak <[hidden email]> wrote:
Hi Arvid,
As you said, I am only interested in batch processing right now. And it seems to be working fine now. 

Thanks for your help!
Best
Kaan

On Thu, Apr 30, 2020 at 2:31 AM Arvid Heise <[hidden email]> wrote:
Hi Kaan,

not entirely sure I understand your solution. I gathered that you create a dataset of TCP addresses and then use flatMap to fetch and output the data?

If so, then I think it's a good solution for batch processing (DataSet). It doesn't work in DataStream because it doesn't play well with checkpointing, but you seem to be interested only in batch. It's also not the first time, I have seen this pattern being used in batch.

In general, if it works and is fast enough, it's always a good solution ;). No need to make it more complicated if you can solve it with simpler means and you can maintain it more easily.

Best,

Arvid

On Wed, Apr 29, 2020 at 10:19 PM Kaan Sancak <[hidden email]> wrote:
Hi Arvid,

I have implemented a zmq listener class without extending any class of Flink.
The listener has a constructor with the port number.

Then in the execution I have created a dateset of string which has the port numbers.
Then I used a flattop function, which returned Tuple2<Long, Long>. I opened the tcp sockets using localhost, so matching was quite easy. 

This seemed to work for me. What do you think about this implementation. Do you see any drawback?

Best
Kaan

On Apr 29, 2020, at 7:40 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

seems like ZMQ is using TCP and not HTTP. So I guess the easiest way would be to use a ZMQ Java binding to access it [1].

But of course, it's much more complicated to write an iterator logic for that. Not sure how ZMQ signals the end of such a graph? Maybe it closes the socket and you can just read as much as possible.


On Tue, Apr 28, 2020 at 10:56 PM Kaan Sancak <[hidden email]> wrote:
Hi Arvid,

I am sorry for the late response. I had some deadlines, but I am back to work now.
I have been trying to implement what we have talked. But I am having problems on the implementation.
I have been using ZMQ to open sockets, because that is inheritenly supported in my graph generator. But, I couldn’t make the connection using input streams.
Do you have any specific examples, where I can look and have a better idea on how to implement this?

Best
Kaan

On Apr 24, 2020, at 4:37 AM, Arvid Heise <[hidden email]> wrote:

Hm, I confused sockets to work the other way around (so pulling like URLInputStream instead of listening). I'd go by providing the data on a port on each generator node. And then read from that in multiple sources.

I think the best solution is to implement a custom InputFormat and then use readInput. You could implement a subclass of GenericInputFormat. You might even use IteratorInputFormat like this:

private static class URLInputIterator implements Iterator<Tuple2<Long, Long>>, Serializable {
private final URL url;
private Iterator<Tuple2<Long, Long>> inner;

private URLInputIterator(URL url) {
this.url = url;
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
InputStream inputStream = url.openStream();
inner = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
.lines()
.map(line -> {
String[] parts = line.split(";");
return new Tuple2<>(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
})
.iterator();
}

@Override
public boolean hasNext() {
return inner.hasNext();
}

@Override
public Tuple2<Long, Long> next() {
return inner.next();
}
}

env.fromCollection(new URLInputIterator(new URL("gen_node1", 9999)), Types.TUPLE(Types.LONG, Types.LONG));



On Fri, Apr 24, 2020 at 9:42 AM Kaan Sancak <[hidden email]> wrote:
Yes, that sounds like a great idea and actually that's what I am trying to do.

 Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

Before trying to open the sockets on the slave nodes, first I have opened just one socket at master node, and I also run the generator with one node as well. I was able to read the graph, and the run my algorithm without any problems. This was a test run to see whatever I can do it.

After, I have opened bunch of sockets on my generators, now I am trying to configure Flink to read from those sockets. However, I am having problems while trying to assign each task manager to a separate socket. I am assuming my problems are related to network binding. In my configuration file,  jobmanager.rpc.address is set but I have not done similar configurations for slave nodes.

Am I on the right track, or is there an easier way to handle this?

I think my point is how to do `read from each of these sockets with a separate source` part.

Thanks again

Best
Kaan
 


On Apr 24, 2020, at 3:11 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

sorry, I haven't considered I/O as the bottleneck. I thought a bit more about your issue and came to a rather simple solution.

How about you open a socket on each of your generator nodes? Then you configure your analysis job to read from each of these sockets with a separate source and union them before feeding them to the actual job?

You don't need to modify much on the analysis job and each source can be independently read. WDYT?

On Fri, Apr 24, 2020 at 8:46 AM Kaan Sancak <[hidden email]> wrote:
Thanks for the answer! Also thanks for raising some concerns about my question.

Some of the graphs I have been using is larger than 1.5 tb, and I am currently an experiment stage of a project, and I am making modifications to my code and re-runing the experiments again. Currently, on some of the largest graphs I have been using, IO became an issue for me and keeps me wait for couple of hours.

Moreover, I have a parallel/distributed graph generator, which I can run on the same set of nodes in my cluster. So what I wanted to do was, to run my Flink program and graph generator at the same time and feed the graph through generator, which should be faster than making IO from the disk. As you said, it is not essential for me to that, but I am trying to see what I am able to do using Flink and how can I solve such problems. I was also using another framework, and faced with the similar problem, I was able to reduce the graph read time from hours to minutes using this method.

 Do you really have more main memory than disk space?

My issue is actually not storage related, I am trying to see how can I reduce the IO time.  

One trick came to my mind is, creating dummy dataset, and using a map function on the dataset, I can open-up bunch of sockets and listen the generator, and collect the generated data. I am trying to see how it will turn out.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

I am not familiar with the analysis jobs. I will look into it.

Again, this is actually not a problem, I am just trying to experiment with the framework and see what I can do. I am very new to Flink, so my methods might be wrong. Thanks for the help! 

Best
Kaan


On Apr 23, 2020, at 10:51 AM, Arvid Heise <[hidden email]> wrote:

Hi Kaan,

afaik there is no (easy) way to switch from streaming back to batch API while retaining all data in memory (correct me if I misunderstood).

However, from your description, I also have some severe understanding problems. Why can't you dump the data to some file? Do you really have more main memory than disk space? Or do you have no shared memory between your generating cluster and the flink cluster?

It almost sounds as if the issue at heart is rather to find a good serialization format on how to store the edges. The 70 billion edges could be stored in an array of id pairs, which amount to ~560 GB uncompressed data if stored in Avro (or any other binary serialization format) when ids are longs. That's not much by today's standards and could also be easily offloaded to S3.

Alternatively, if graph generation is rather cheap, you could also try to incorporate it directly into the analysis job.

On Wed, Apr 22, 2020 at 2:58 AM Kaan Sancak <[hidden email]> wrote:
Hi,

I have been running some experiments on  large graph data, smallest graph I have been using is around ~70 billion edges. I have a graph generator, which generates the graph in parallel and feeds to the running system. However, it takes a lot of time to read the edges, because even though the graph generation process is parallel, in Flink I can only listen from master node (correct me if I am wrong). Another option is dumping the generated data to a file and reading with readFromCsv, however this is not feasible in terms of storage management.

What I want to do is, invoking my graph generator, using ipc/tcp protocols  and reading the generated data from the sockets. Since the graph data is also generated parallel in each node, I want to make use of ipc, and read the data in parallel at each node. I made some online digging  but couldn’t find something similar using dataset api. I would be glad if you have some similar use cases or examples.

Is it possible to use streaming environment to create the data in parallel and switch to dataset api?

Thanks in advance!

Best
Kaan


--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--
Arvid Heise | Senior Java Developer

Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   



--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng   


--

Arvid Heise | Senior Java Developer


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng