Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception. java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access. at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438) at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81) at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133) at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827) at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738) Flink Job's Logic FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>( "/apps/application-stream:flink-demo", new MerchantApiSchema(), properties); DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer); DataStream<Tuple3<String, String, Long>> outputStream = inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1) .window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000))) .sum(2); DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent()); output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE); // execute program env.execute("Filter Transformation Example"); } public static class CreateTuple implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> { @Override public Tuple3<String, String, Long> map(MerchantApiEvent input) throws Exception { return new Tuple3<String, String, Long>(input.getMerchantId(), input.getApiName(), 1L); } } public static class CountEvent extends RichFlatMapFunction<Tuple3<String, String, Long>, Long> { private transient MapState<String, Long> mapState; @Override public void flatMap(Tuple3<String, String, Long> input, Collector<Long> out) throws Exception { mapState.put(input.f1, input.f2); } @Override public void open(Configuration config) { MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<String, Long>( "mapQuery", TypeInformation.of(new TypeHint<String>() { }), TypeInformation.of(new TypeHint<Long>() { })); mapStateDesc.setQueryable("mapQuery"); mapState = getRuntimeContext().getMapState(mapStateDesc); } } Flink Query Client's Logic final JobID jobId = JobID.fromHexString(jobIdParam); String key = queryStateRequestDto.getKey(); final Configuration config = new Configuration(); config.setString(JobManagerOptions.ADDRESS, jobManagerHost); config.setInteger(JobManagerOptions.PORT, jobManagerPort); HighAvailabilityServices highAvailabilityServices = null; try { highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.newSingleThreadScheduledExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } try { QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); final TypeSerializer<String> keySerializer = TypeInformation.of(new TypeHint<String>() { }).createSerializer(new ExecutionConfig()); final TypeSerializer<Map<String, Long>> valueSerializer = TypeInformation.of(new TypeHint<Map<String, Long>>() { }).createSerializer(new ExecutionConfig()); final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(key, keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); scala.concurrent.Future<byte[]> serializedResult = client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey); // now wait for the result and return it final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS); byte[] serializedValue = Await.result(serializedResult, duration); Map<String, Long> value = KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer); System.out.println(value); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } |
Hi Velu,
I would recommend to switch to Flink 1.4 as the queryable state has been refactored to be compatible with all types of state. You can read more here: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/queryable_state.html In addition, a lot of things have been simplified. And for an example you can use this link: which is directly from the Queryable State IT cases. Thanks, Kostas
|
Free forum by Nabble | Edit this page |