Hi, I am getting multiple exceptions while trying to use RocksDB as a state backend. I have 2 Task Managers with 2 taskslots and 4 cores each. Below is our setup:
Kafka(Topic with 2 partitions) ---> FlinkKafkaConsumer(2 Parallelism) ----> KeyedProcessFunction(4 Parallelism) ----> FlinkKafkaProducer(1 Parallelism) ----> KafkaTopic
public class Aggregator_KeyedExpression extends KeyedProcessFunction<Object, GameZoneInput, GameZoneOutput> {
private ValueState<Integer> valueState;
@Override public void open() throws Exception { ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>( "totalPrize",Integer.class);
valueState = getRuntimeContext().getState(descriptor); }
@Override public void processElement(GameZoneInput inEvent, Context ctx, final List<GameZoneOutput> outEvents) throws Exception {
if (valueState.value() == null) { valueState.update(0); }
valueState.update(valueState.value() + inEvent.getPrizeDelta()); -----> NullPointerException on this line
int sum = valueState.value();
GameZoneOutput output = new GameZoneOutput(); output.setPlayerId(inEvent.getPlayerId()); output.setNetPrize(sum); outEvents.add(output);
}
@Override public void close() throws Exception { valueState.clear(); } } While doing a load test, I get a NullPointerException in valueState.value(). Which seems strange as we would have updated the value state above. Another strange thing is that this is observed only in load conditions and works fine otherwise. We also see some serialization exceptions: Suppressed: java.lang.IllegalArgumentException: Position out of bounds. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) at org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:352) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:185) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamespace(RocksDBSerializedCompositeKeyBuilder.java:114) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespace(AbstractRocksDBState.java:163) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.clear(AbstractRocksDBState.java:113) Any leads would be appreciated. Thanks Chirag |
Hi Chirag, If be able to produce the exception, could you first add some logs to print the value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ? I think either object being null would cause NullPointerException here. For the second exception, I found a similar issue[1], caused by concurrent access to the value state. Do we have the similar situation here ? Best, Yun Best, Yun
|
Thanks for the reply Yun. I strangely don't see any nulls. And infact this exception comes on the first few records and then job starts processing normally. Also, I don't see any reason for Concurrent access to the state in my code. Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao <[hidden email]> wrote:
Hi Chirag, If be able to produce the exception, could you first add some logs to print the value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ? I think either object being null would cause NullPointerException here. For the second exception, I found a similar issue[1], caused by concurrent access to the value state. Do we have the similar situation here ? Best, Yun Best, Yun
|
Hi, I think I got my issue. Would help if someone can confirm it :) I am using a NFS filesystem for storing my checkpoints and my Flink cluster is running on a K8 with 2 TMs and 2 JMs. All my pods share the NFS PVC with state.checkpoint.dir and we also missed setting the RocksDB local dir. Does this lead to state corruption? Thanks, Chirag
On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan <[hidden email]> wrote:
Thanks for the reply Yun. I strangely don't see any nulls. And infact this exception comes on the first few records and then job starts processing normally. Also, I don't see any reason for Concurrent access to the state in my code. Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao <[hidden email]> wrote:
Hi Chirag, If be able to produce the exception, could you first add some logs to print the value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ? I think either object being null would cause NullPointerException here. For the second exception, I found a similar issue[1], caused by concurrent access to the value state. Do we have the similar situation here ? Best, Yun Best, Yun
|
Hi, Although this looks like a problem to me, I still cant conclude it. I tried reducing my TM replicas from 2 to 1 with 4 slots and 4 cores each. I was hoping that with single TM there will be file write conflicts. But that doesn't seem to be the case as still get the: Caused by: org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: Key group 2 is not in KeyGroupRange{startKeyGroup=64, endKeyGroup=95}. I have checked that there's no concurrent access on the ValueState. Any more leads? Thanks, Chirag
On Monday, 7 June, 2021, 06:56:56 pm IST, Chirag Dewan <[hidden email]> wrote:
Hi, I think I got my issue. Would help if someone can confirm it :) I am using a NFS filesystem for storing my checkpoints and my Flink cluster is running on a K8 with 2 TMs and 2 JMs. All my pods share the NFS PVC with state.checkpoint.dir and we also missed setting the RocksDB local dir. Does this lead to state corruption? Thanks, Chirag
On Monday, 7 June, 2021, 08:54:39 am IST, Chirag Dewan <[hidden email]> wrote:
Thanks for the reply Yun. I strangely don't see any nulls. And infact this exception comes on the first few records and then job starts processing normally. Also, I don't see any reason for Concurrent access to the state in my code. Could more CPU cores than task slots to the Task Manager be the reason for it?
On Saturday, 5 June, 2021, 06:40:27 pm IST, Yun Gao <[hidden email]> wrote:
Hi Chirag, If be able to produce the exception, could you first add some logs to print the value of valueState, valueState.value(), inEvent and inEvent.getPriceDelta() ? I think either object being null would cause NullPointerException here. For the second exception, I found a similar issue[1], caused by concurrent access to the value state. Do we have the similar situation here ? Best, Yun Best, Yun
|
Hi Chirag, As far as I know, If you are running a single job, I think all th pods share the same state.checkpoints.dir configuration should be as expected, and it is not necessary to configuraiton the rocksdb local dir since Flink will chosen a default dir. Regarding the latest exception, I think you might first check the key type used and the key type should has a stable hashcode method. Best, Yun
|
Thanks for the reply Yun. The key is an Integer type. Do you think there can be hash collisions for Integers? It somehow works on single TM now. No errors for 1m records. But as soon as we move to 2 TMs, we get all sort of errors - 'Position Out of Bound', key not in Keygroup etc. This also causes a NPE in the user defined code - if (valueState != null) valueState.value() -> This causes Null, so while the if check passed, it caused an NPE while reading the value. Thanks, Chirag
On Tuesday, 8 June, 2021, 08:29:04 pm IST, Yun Gao <[hidden email]> wrote:
Hi Chirag, As far as I know, If you are running a single job, I think all th pods share the same state.checkpoints.dir configuration should be as expected, and it is not necessary to configuraiton the rocksdb local dir since Flink will chosen a default dir. Regarding the latest exception, I think you might first check the key type used and the key type should has a stable hashcode method. Best, Yun
|
Hi Chirag, Logically Integer type should not have this issue. Sorry that from the current description I have not found other issues, could you also share the code in the main method that adds the KeyProcessFunction into the job ? Very thanks! Best, Yun
|
Hi Chirag, Which Flink version are you using? As far as I understand, the issue is appearing just by writing the initial data - no recovery happened right? Could you try to change the code such that you only have a single read/update on the state? It should work as you have done it but I'd like to pinpoint the issue further. On Thu, Jun 10, 2021 at 8:25 AM Yun Gao <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |