Sometimes Counter Metrics getting Stuck and not increasing

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Sometimes Counter Metrics getting Stuck and not increasing

Prasanna kumar
Hi,

We are publishing around 200 kinds of events for 15000 customers.
Source Kafka Topics , Sink Amazon SNS Topic. 
We are collecting metrics in the following combination [Event , Consumer,  PublishResult].  (Publish Result could be published or error).
So Metrics count is in the order of 200*15000*2 = 6 million and the growth rate is expected around 20% per year

We are using Prometheus reporter for scraping metrics.
Yesterday when i found that for one customer the metric got stuck at a particular value.
But while printing the value of "counterEventPublishedMapKey" in logs , we are getting correct increasing values. 
There are no other Warning Or Error in the logs. 
Other similar metrics are being scrapped without any issues.

The Performance of the overall pipeline is good and we don't see any other issues. 
JM Memory is 4Gb (Metaspace is 1gb)
TM Memory is 4gb (Heap is 3gb)
Flink Version 1.12.2
Ran with Parallelism of 2. 

There are other jobs where we have used similar structure to publish metrics through RichMap Function and they are running successfully.

1) Can we rely on the metrics ported in this fashion to report ?
2) Has anyone faced this kind of scenario before ?
3) What do we do if a particular counter metric alone gets stuck like this (Say other counter metrics are working).
4) Could you point out if the code structure is the reason for the same ?
 

public class SNSPublisher implements EventJobs {

private static FlinkKafkaConsumer getFlinkKafkaConsumer(ParameterTool configParams) {
.......
}

@Override
public void execute(ParameterTool configParams) throws Exception {


final StreamExecutionEnvironment env =
KafkaUtil.createStreamExecutionEnvironment(configParams);

// Enabling Checkpointing

env.enableCheckpointing(1000);
String checkpointingDirectory = configParams.get(AppConstant.CHECKPOINTING_DIRECTORY);
env.setStateBackend(new FsStateBackend(checkpointingDirectory,true));
Class<?> unmodifiableCollectionsSerializer = Class.forName("java.util.Collections$UnmodifiableCollection");
env.getConfig().addDefaultKryoSerializer(unmodifiableCollectionsSerializer,
UnmodifiableCollectionsSerializer.class);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

FlinkKafkaConsumer flinkKafkaConsumer = getFlinkKafkaConsumer(configParams);

DataStream<Tuple2<String, Event>> inputStream = env.addSource(flinkKafkaConsumer)
.uid(configParams.get(AppConstant.STATE_KAFKA_SOURCE_UUID))
.name(AppConstant.SOURCE);
DataStream<Event> eventStream = inputStream
.map((MapFunction<Tuple2<String, Event>, Event>) value -> value.getField(1));
SNSMessagePublisherFunction snsMessagePublisherFunction = new SNSMessagePublisherFunction(configParams);

SNSMessagePublisher snsMessagePublisher = new SNSMessagePublisherImpl(snsMessagePublisherFunction);

DataStream<Tuple2<String, Event>> result = snsMessagePublisher.publish(eventStream);

result
.keyBy(resultTuple -> getMapEventCounterKey(resultTuple))
.map(
new RichMapFunction<Tuple2<String, Event>, String>() {

private transient MapState<String, Counter> counterEventPublishedMapState;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor<String, Counter> stateDescriptor =
new MapStateDescriptor<String, Counter>(
"counterEventPublishedMapState",
String.class,
Counter.class);

counterEventPublishedMapState = getRuntimeContext().getMapState(stateDescriptor);
}

@Override
public String map(Tuple2<String, Event> tuple2) throws Exception {

Event event = tuple2.getField(1);

MetricGroup metricGroup = getRuntimeContext().getMetricGroup();

Counter counter;
String counterName = getResultofSNSPublish(tuple2);
String counterEventMapKey = getMapEventCounterKey(tuple2);

if (counterEventPublishedMapState.contains(counterEventMapKey)) {
counter = counterEventPublishedMapState.get(counterEventMapKey);
} else {
counter =
metricGroup
.addGroup(AppConstant.EVENT, event.fetchEventType())
.addGroup(AppConstant.CONSUMER, event.fetchConsumerID())
.counter(counterName);
}
counter.inc();
Long counterValue = counter.getCount();
counterEventPublishedMapState.put(counterEventMapKey, counter);

return new StringBuilder("counterEventPublishedMapKey:")
.append(counterEventMapKey)
.append(AppConstant.COLON)
.append(counterValue)
.toString();
}
}).uid(configParams.get(AppConstant.COUNTER_MAP_KEY_UUID))
.print();

SingleOutputStreamOperator<Tuple2<String, Event>> singleOutputStreamOperator =
(SingleOutputStreamOperator<Tuple2<String, Event>>) result;

singleOutputStreamOperator.name(AppConstant.SNS_SINK_NAME).uid(configParams.get(
AppConstant.STATE_SNS_SINK_UUID));
env.execute(configParams.get(AppConstant.JOB_ENVIRONMENT) + AppConstant.SNS_PUBLISHER);
}

private static String getMapEventCounterKey(Tuple2<String, Event> resultTuple) {

Event event = resultTuple.getField(1);

return new StringBuilder(event.fetchEventType())
.append(AppConstant.HYPEN)
.append(event.fetchConsumerID())
.append(AppConstant.HYPEN)
.append(getResultofSNSPublish(resultTuple))
.toString();
}

private static String getResultofSNSPublish(Tuple2<String, Event> resultTuple) {
//result is either published.<SNSMessageID> or error.<error text>
....
}
}


Thanks,
Prasanna.