In the class org.apache.flink.runtime.io.network.api.writer.RecordWriter, it
has same number of serializers with the numChannels. If I first operator has 500 parallels and the next operator has 1000 parallels. And every message in flink is 2MB. The job takes 500 * 1000 * 2MB as 1TB memory in totally!!! Can I do anything to reduce the memory usage. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
I don't there's anything you can do except reducing the parallelism or
the size of your messages. A separate serializer is used for each channel as the serializers are stateful; they are capable of writing records partially to a given MemorySegment to better utilize the allocated memory. How many messages is each operator instance processing per second? I would imagine that at this scale your memory consumption goes through the roof anyway due to the message size. Even if every operator instance is only processing 10 records/s you're already looking at 10TB memory usage for in-flight data. On 14.11.2017 11:11, yunfan123 wrote: > In the class org.apache.flink.runtime.io.network.api.writer.RecordWriter, it > has same number of serializers with the numChannels. > If I first operator has 500 parallels and the next operator has 1000 > parallels. > And every message in flink is 2MB. > The job takes 500 * 1000 * 2MB as 1TB memory in totally!!! > Can I do anything to reduce the memory usage. > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
We're actually also trying to have the serializer stateless in future and may
be able to remove the intermediate serialization buffer which is currently growing on heap before we copy the data into the actual target buffer. This intermediate buffer grows and is pruned after serialization if it is bigger than 5MB (see DataOutputSerializer), and re-used for anything below that threshold. So you may actually have up to 5MB per output channel which sits waiting for data. Please refer to https://issues.apache.org/jira/browse/FLINK-4893 for updates on this. These improvements will certainly reduce some of our memory footprint and help you. Throughput will then, of course, be limited by your network's speed and the number of network buffers to hold this amount of data and to saturate your network connections. The availability of these buffers will then limit your throughput accordingly. Nico On Tuesday, 14 November 2017 11:29:33 CET Chesnay Schepler wrote: > I don't there's anything you can do except reducing the parallelism or > the size of your messages. > > A separate serializer is used for each channel as the serializers are > stateful; they are capable of writing records partially > to a given MemorySegment to better utilize the allocated memory. > > How many messages is each operator instance processing per second? I > would imagine that at this scale > your memory consumption goes through the roof anyway due to the message > size. > Even if every operator instance is only processing 10 records/s you're > already looking at 10TB memory usage > for in-flight data. > > On 14.11.2017 11:11, yunfan123 wrote: > > In the class org.apache.flink.runtime.io.network.api.writer.RecordWriter, > > it has same number of serializers with the numChannels. > > If I first operator has 500 parallels and the next operator has 1000 > > parallels. > > And every message in flink is 2MB. > > The job takes 500 * 1000 * 2MB as 1TB memory in totally!!! > > Can I do anything to reduce the memory usage. > > > > > > > > -- > > Sent from: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |