Hello,
I’m struggling with this simple issue for hours now : I am unable to get the accumulator result of a streaming context result, the accumulator map
in the JobExecutionResult is always empty.
Simple test code (directly inspired from the documentation) :
My source =
public
static
class
oneRandomNumberSource implements SourceFunction<Integer>, Serializable {
@Override
public
void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Integer>
ctx)
throws Exception {
final Random
rnd =
new Random(29172);
ctx.collect(rnd.nextInt());
}
@Override
public
void cancel() {
}
}
My exec program =
public
static
final String
COUNTER_NBLINE =
"num-lines";
void test() {
final
StreamExecutionEnvironment env = getCluster();
final SourceFunction<Integer>
source =
new oneRandomNumberSource();
env.addSource(source).addSink(new
RichSinkFunction<Integer>() {
private IntCounter
numLines =
new IntCounter();
@Override
public
void open(Configuration
parameters)
throws Exception {
// NOPMD
getRuntimeContext().addAccumulator(COUNTER_NBLINE,
this.numLines);
}
@Override
public
void invoke(Integer
value)
throws Exception {
System.err.println(value);
numLines.add(1);
}
});
try {
final JobExecutionResult
result =
env.execute();
System.out.println(result.getAccumulatorResult(COUNTER_NBLINE));
// Problem : always null
}
catch (Exception
e) {
e.printStackTrace();
}
}
Console output :
07/16/2015 14:11:58 Job execution switched to status RUNNING.
07/16/2015 14:11:58 Custom Source(1/1) switched to SCHEDULED
07/16/2015 14:11:58 Custom Source(1/1) switched to DEPLOYING
07/16/2015 14:11:58 Stream Sink(1/4) switched to SCHEDULED
07/16/2015 14:11:58 Stream Sink(1/4) switched to DEPLOYING
07/16/2015 14:11:58 Stream Sink(2/4) switched to SCHEDULED
07/16/2015 14:11:58 Stream Sink(2/4) switched to DEPLOYING
07/16/2015 14:11:58 Stream Sink(3/4) switched to SCHEDULED
07/16/2015 14:11:58 Stream Sink(3/4) switched to DEPLOYING
07/16/2015 14:11:58 Stream Sink(4/4) switched to SCHEDULED
07/16/2015 14:11:58 Stream Sink(4/4) switched to DEPLOYING
07/16/2015 14:11:58 Custom Source(1/1) switched to RUNNING
07/16/2015 14:11:58 Stream Sink(1/4) switched to RUNNING
07/16/2015 14:11:58 Stream Sink(2/4) switched to RUNNING
07/16/2015 14:11:58 Stream Sink(4/4) switched to RUNNING
07/16/2015 14:11:58 Stream Sink(3/4) switched to RUNNING
07/16/2015 14:11:58 Custom Source(1/1) switched to FINISHED
07/16/2015 14:11:58 Stream Sink(4/4) switched to FINISHED
07/16/2015 14:11:58 Stream Sink(3/4) switched to FINISHED
-329782788
07/16/2015 14:11:58 Stream Sink(2/4) switched to FINISHED
07/16/2015 14:11:58 Stream Sink(1/4) switched to FINISHED
07/16/2015 14:11:58 Job execution switched to status FINISHED.
null
What am I doing wrong ?
Flink version is 0.9.0
Best regards,
Arnaud
Free forum by Nabble | Edit this page |