improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

Chang Liu
Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 

However, this warnings are keeping popping up and the job cannot be released so that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator is justing doing some map operation. I am wondering what could be the cause of this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅


Reply | Threaded
Open this post in threaded view
|

Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

Chang Liu
Ok, I think I found where is the issue, but I don’t understand why.

I have a method:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

And this is how I am using it:

lazy indicatorsStream: DataStream[Indicators] = ...

lazy val scenarios: Set[Scenario] = loadScenarios(...)

lazy val evalStream: DataStream[Evaluation] = evaluationStream(indicatorsStream, scenarios).print()


The problem is caused by the scenarios, which is passed as an argument of the method evaluationStream. But is is not working.

It will work if I do it in the following way:

lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)

def evaluationStream(indicatorsStream: DataStream[Indicators]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

where the scenarios is not passed as a method argument but is a static object variable.

But this is not what I want, I would like to have a configurable scenarios which I can load from config file instead of a static object variable.

Any idea why this is happening? I also have other codes where I am also passing arguments and use them as part of my data flow and they are just working fine.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 10:47, Chang Liu <[hidden email]> wrote:

Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 

However, this warnings are keeping popping up and the job cannot be released so that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator is justing doing some map operation. I am wondering what could be the cause of this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅



Reply | Threaded
Open this post in threaded view
|

Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

Chang Liu
I have tried another way, it is not working as well:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map(new IndicatorsToTxEval(scenarios))

class IndicatorsToTxEval(
    scenarios: Set[Scenario])
  extends MapFunction[Indicators, Evaluation] {

  override def map(inds: Indicators): Evaluation =
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
}

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 13:33, Chang Liu <[hidden email]> wrote:

Ok, I think I found where is the issue, but I don’t understand why.

I have a method:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

And this is how I am using it:

lazy indicatorsStream: DataStream[Indicators] = ...

lazy val scenarios: Set[Scenario] = loadScenarios(...)

lazy val evalStream: DataStream[Evaluation] = evaluationStream(indicatorsStream, scenarios).print()


The problem is caused by the scenarios, which is passed as an argument of the method evaluationStream. But is is not working.

It will work if I do it in the following way:

lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)

def evaluationStream(indicatorsStream: DataStream[Indicators]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

where the scenarios is not passed as a method argument but is a static object variable.

But this is not what I want, I would like to have a configurable scenarios which I can load from config file instead of a static object variable.

Any idea why this is happening? I also have other codes where I am also passing arguments and use them as part of my data flow and they are just working fine.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 10:47, Chang Liu <[hidden email]> wrote:

Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 

However, this warnings are keeping popping up and the job cannot be released so that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator is justing doing some map operation. I am wondering what could be the cause of this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅




Reply | Threaded
Open this post in threaded view
|

Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

Stefan Richter-3
Hi,

Which version of Flink are you using? This issue https://issues.apache.org/jira/browse/FLINK-10283 shows that a similar problem was fixed in 1.6.1 and 1.7. If you use a newer version and still encounter the problem, you can reopen the issue and describe how this is still a problem for you.

Best,
Stefan 

On 22. Jan 2019, at 13:49, Chang Liu <[hidden email]> wrote:

I have tried another way, it is not working as well:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map(new IndicatorsToTxEval(scenarios))

class IndicatorsToTxEval(
    scenarios: Set[Scenario])
  extends MapFunction[Indicators, Evaluation] {

  override def map(inds: Indicators): Evaluation =
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
}

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 13:33, Chang Liu <[hidden email]> wrote:

Ok, I think I found where is the issue, but I don’t understand why.

I have a method:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

And this is how I am using it:

lazy indicatorsStream: DataStream[Indicators] = ...

lazy val scenarios: Set[Scenario] = loadScenarios(...)

lazy val evalStream: DataStream[Evaluation] = evaluationStream(indicatorsStream, scenarios).print()


The problem is caused by the scenarios, which is passed as an argument of the method evaluationStream. But is is not working.

It will work if I do it in the following way:

lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)

def evaluationStream(indicatorsStream: DataStream[Indicators]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

where the scenarios is not passed as a method argument but is a static object variable.

But this is not what I want, I would like to have a configurable scenarios which I can load from config file instead of a static object variable.

Any idea why this is happening? I also have other codes where I am also passing arguments and use them as part of my data flow and they are just working fine.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 10:47, Chang Liu <[hidden email]> wrote:

Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 

However, this warnings are keeping popping up and the job cannot be released so that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator is justing doing some map operation. I am wondering what could be the cause of this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅





Reply | Threaded
Open this post in threaded view
|

Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

Chang Liu
Hi Stefan,

Thanks. I am using 1.6.0. I will upgrade to 1.6.1 and see whether the problem remains.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 16:10, Stefan Richter <[hidden email]> wrote:

Hi,

Which version of Flink are you using? This issue https://issues.apache.org/jira/browse/FLINK-10283 shows that a similar problem was fixed in 1.6.1 and 1.7. If you use a newer version and still encounter the problem, you can reopen the issue and describe how this is still a problem for you.

Best,
Stefan 

On 22. Jan 2019, at 13:49, Chang Liu <[hidden email]> wrote:

I have tried another way, it is not working as well:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map(new IndicatorsToTxEval(scenarios))

class IndicatorsToTxEval(
    scenarios: Set[Scenario])
  extends MapFunction[Indicators, Evaluation] {

  override def map(inds: Indicators): Evaluation =
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
}

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 13:33, Chang Liu <[hidden email]> wrote:

Ok, I think I found where is the issue, but I don’t understand why.

I have a method:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

And this is how I am using it:

lazy indicatorsStream: DataStream[Indicators] = ...

lazy val scenarios: Set[Scenario] = loadScenarios(...)

lazy val evalStream: DataStream[Evaluation] = evaluationStream(indicatorsStream, scenarios).print()


The problem is caused by the scenarios, which is passed as an argument of the method evaluationStream. But is is not working.

It will work if I do it in the following way:

lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)

def evaluationStream(indicatorsStream: DataStream[Indicators]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

where the scenarios is not passed as a method argument but is a static object variable.

But this is not what I want, I would like to have a configurable scenarios which I can load from config file instead of a static object variable.

Any idea why this is happening? I also have other codes where I am also passing arguments and use them as part of my data flow and they are just working fine.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 10:47, Chang Liu <[hidden email]> wrote:

Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 

However, this warnings are keeping popping up and the job cannot be released so that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator is justing doing some map operation. I am wondering what could be the cause of this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅






Reply | Threaded
Open this post in threaded view
|

Re: improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId

Chang Liu
Hi Stefan,

I have upgraded to 1.6.1. I saw the warnings are gone but my issue remains: the scenarios: Set[Scenario] cannot be passed as a method argument in order to be used in the map function.

But it is working if I just directly use the object variable scenarios: Set[Scenario] instead of passing it as a method argument.

Does it have anything to do with the class Scenario?

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 16:34, Chang Liu <[hidden email]> wrote:

Hi Stefan,

Thanks. I am using 1.6.0. I will upgrade to 1.6.1 and see whether the problem remains.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 16:10, Stefan Richter <[hidden email]> wrote:

Hi,

Which version of Flink are you using? This issue https://issues.apache.org/jira/browse/FLINK-10283 shows that a similar problem was fixed in 1.6.1 and 1.7. If you use a newer version and still encounter the problem, you can reopen the issue and describe how this is still a problem for you.

Best,
Stefan 

On 22. Jan 2019, at 13:49, Chang Liu <[hidden email]> wrote:

I have tried another way, it is not working as well:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map(new IndicatorsToTxEval(scenarios))

class IndicatorsToTxEval(
    scenarios: Set[Scenario])
  extends MapFunction[Indicators, Evaluation] {

  override def map(inds: Indicators): Evaluation =
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
}

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 13:33, Chang Liu <[hidden email]> wrote:

Ok, I think I found where is the issue, but I don’t understand why.

I have a method:

def evaluationStream(
    indicatorsStream: DataStream[Indicators],
    scenarios: Set[Scenario]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

And this is how I am using it:

lazy indicatorsStream: DataStream[Indicators] = ...

lazy val scenarios: Set[Scenario] = loadScenarios(...)

lazy val evalStream: DataStream[Evaluation] = evaluationStream(indicatorsStream, scenarios).print()


The problem is caused by the scenarios, which is passed as an argument of the method evaluationStream. But is is not working.

It will work if I do it in the following way:

lazy val scenarios: Set[Scenario] = Set(S1, S2, ...)

def evaluationStream(indicatorsStream: DataStream[Indicators]): DataStream[Evaluation] =
  indicatorsStream.map { indicators =>
    Evaluation(indicators.id, evaluateScenarios(indicators, scenarios))
  }

where the scenarios is not passed as a method argument but is a static object variable.

But this is not what I want, I would like to have a configurable scenarios which I can load from config file instead of a static object variable.

Any idea why this is happening? I also have other codes where I am also passing arguments and use them as part of my data flow and they are just working fine.

Many Thanks.

Best regards/祝好,

Chang Liu 刘畅


On 22 Jan 2019, at 10:47, Chang Liu <[hidden email]> wrote:

Dear community,

I am having a problem releasing the job.

2019-01-22 10:42:50.098  WARN [Source: Custom Source -> Kafka -> ConstructTxSepa -> FilterOutFailures -> ObtainActualTxSepa -> TxSepaStream -> TxStream -> IndicatorsEvalStream -> TxEvalStream -> Sink: Print to Std. Out (2/4)] [FileCache] - improper use of releaseJob() without a matching number of createTmpFiles() calls for jobId 9e1557723a065925e01c7749899547fb
 

However, this warnings are keeping popping up and the job cannot be released so that my data flow is not working.

But if I remove my last operator, it will work just fine. But my last operator is justing doing some map operation. I am wondering what could be the cause of this issue?

Many Thanks :)

Best regards/祝好,

Chang Liu 刘畅