Flink batch job memory/disk leak when invoking set method on a static Configuration object.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink batch job memory/disk leak when invoking set method on a static Configuration object.

Vadim Vararu
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local file.

public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat<String> implements OutputFormat<String> {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion.




Also, the problem reproduces only if I actually invoke the set method of Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself.

Thanks, 
Vadim.

Reply | Threaded
Open this post in threaded view
|

Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

Haibo Sun
Hi, Vadim

This similar issue has occurred in earlier versions, see https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. I'll see if I can reproduce it with the master branch code, and if yes, I will try to investigate it.

If someone already knows the cause of the problem, that's the best,  it won't need to be re-investigated.

Best,
Haibo


At 2019-06-28 00:46:43, "Vadim Vararu" <[hidden email]> wrote:
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local file.

public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat<String> implements OutputFormat<String> {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion.




Also, the problem reproduces only if I actually invoke the set method of Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself.

Thanks, 
Vadim.

Reply | Threaded
Open this post in threaded view
|

Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

Vadim Vararu
Hi,

I've run it on a standalone Flink cluster. No Yarn involved.

From: Haibo Sun <[hidden email]>
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: [hidden email]
Subject: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.
 
Hi, Vadim

This similar issue has occurred in earlier versions, see https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. I'll see if I can reproduce it with the master branch code, and if yes, I will try to investigate it.

If someone already knows the cause of the problem, that's the best,  it won't need to be re-investigated.

Best,
Haibo


At 2019-06-28 00:46:43, "Vadim Vararu" <[hidden email]> wrote:
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local file.

public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat<String> implements OutputFormat<String> {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion.




Also, the problem reproduces only if I actually invoke the set method of Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself.

Thanks, 
Vadim.

Reply | Threaded
Open this post in threaded view
|

Re:Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

Haibo Sun
Hi, Vadim 
 
I tried many times with the master branch code and failed to reproduce this issue. Which version of Flink did you use?

For the Configuration class in your code, I use `org. apache. hadoop. conf. Configuration`.

The configurations I enabled in flink-conf.yaml are as follows (except that, no other changes have been made):
high-availability: zookeeper
high-availability.storageDir: file:///data/tmp/flink/ha/
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink-test
blob.storage.directory: /data/tmp/flink/blob

Best,
Haibo

At 2019-06-28 15:49:50, "Vadim Vararu" <[hidden email]> wrote:
Hi,

I've run it on a standalone Flink cluster. No Yarn involved.

From: Haibo Sun <[hidden email]>
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: [hidden email]
Subject: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.
 
Hi, Vadim

This similar issue has occurred in earlier versions, see https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. I'll see if I can reproduce it with the master branch code, and if yes, I will try to investigate it.

If someone already knows the cause of the problem, that's the best,  it won't need to be re-investigated.

Best,
Haibo


At 2019-06-28 00:46:43, "Vadim Vararu" <[hidden email]> wrote:
Hi guys,

I have a simple batch job with a custom output formatter that writes to a local file.

public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat<String> implements OutputFormat<String> {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
public static final
String DEFAULT_LINE_DELIMITER = "\n";

private
Writer writer;

static
{
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this
.writer.close();
}

super.close();
}
}

The problem is that after the job is finished, there is somewhere a memory leak that does not permit the blobStore of the job to be deleted. The number of such "deleted" files increases after each job run. Even if they are marked as deleted, there is somewhere a reference to the file in the JobManager process that keeps it from actual deletion.




Also, the problem reproduces only if I actually invoke the set method of Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new Configuration();
to a non-static field, then the problem does no reproduce any more.


However, I'm interested if that's a normal behaviour or a bug/leak somewhere in Flink itself.

Thanks, 
Vadim.