Unable to query MapState

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to query MapState

Velumani Duraisamy
Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception. 

java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access.
        at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
        at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
        at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)

Flink Job's Logic

    FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>(
        "/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);

    DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);

    DataStream<Tuple3<String, String, Long>> outputStream =
        inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
            .window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000)))
            .sum(2);

    DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent());

    output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);

    // execute program
    env.execute("Filter Transformation Example");

  }


  public static class CreateTuple
      implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> {
    @Override
    public Tuple3<String, String, Long> map(MerchantApiEvent input) throws Exception {
      return new Tuple3<String, String, Long>(input.getMerchantId(), input.getApiName(), 1L);
    }

  }

  public static class CountEvent extends RichFlatMapFunction<Tuple3<String, String, Long>, Long> {

    private transient MapState<String, Long> mapState;

    @Override
    public void flatMap(Tuple3<String, String, Long> input, Collector<Long> out) throws Exception {

      mapState.put(input.f1, input.f2);

    }

    @Override
    public void open(Configuration config) {

      MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<String, Long>(
          "mapQuery", TypeInformation.of(new TypeHint<String>() {
          }), TypeInformation.of(new TypeHint<Long>() {
          }));
      mapStateDesc.setQueryable("mapQuery");

      mapState = getRuntimeContext().getMapState(mapStateDesc);

    }
  }


Flink Query Client's Logic

    final JobID jobId = JobID.fromHexString(jobIdParam);

    String key = queryStateRequestDto.getKey();

    final Configuration config = new Configuration();
    config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
    config.setInteger(JobManagerOptions.PORT, jobManagerPort);

    HighAvailabilityServices highAvailabilityServices = null;
    try {
      highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
          config, Executors.newSingleThreadScheduledExecutor(),
          HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

    try {
      QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);

      final TypeSerializer<String> keySerializer = TypeInformation.of(new TypeHint<String>() {
      }).createSerializer(new ExecutionConfig());
      final TypeSerializer<Map<String, Long>> valueSerializer =
          TypeInformation.of(new TypeHint<Map<String, Long>>() {
          }).createSerializer(new ExecutionConfig());

      final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(key,
          keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

      scala.concurrent.Future<byte[]> serializedResult =
          client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey);

      // now wait for the result and return it
      final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
      byte[] serializedValue = Await.result(serializedResult, duration);
      Map<String, Long> value =
          KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
      System.out.println(value);
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
Reply | Threaded
Open this post in threaded view
|

Re: Unable to query MapState

Kostas Kloudas
Hi Velu,

I would recommend to switch to Flink 1.4 as the queryable state has been refactored to be compatible with all types of state.
In addition, a lot of things have been simplified.

And for an example you can use this link: 

which is directly from the Queryable State IT cases.

Thanks,
Kostas

On Jan 22, 2018, at 2:38 PM, Velu Mitwa <[hidden email]> wrote:

Hi,
I am trying to query Flink's MapState from Flink client (1.3.2). I was able to query ValueState but when I tried to query MapState I am getting an exception. 

java.io.IOException: Unconsumed bytes in the deserialized value. This indicates a mismatch in the value serializers used by the KvState instance and this access.
        at org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.deserializeValue(KvStateRequestSerializer.java:438)
        at com.paysafe.ss.flink.client.service.impl.QueryStateServiceImpl.getKeyValue(QueryStateServiceImpl.java:81)
        at com.paysafe.ss.flink.client.web.rest.controller.QueryStateController.getStateValue(QueryStateController.java:49)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)

Flink Job's Logic

    FlinkKafkaConsumer09<MerchantApiEvent> consumer = new FlinkKafkaConsumer09<>(
        "/apps/application-stream:flink-demo", new MerchantApiSchema(), properties);

    DataStream<MerchantApiEvent> inputEventStream = env.addSource(consumer);

    DataStream<Tuple3<String, String, Long>> outputStream =
        inputEventStream.map(new CreateTuple()).keyBy(0).keyBy(1)
            .window(SlidingProcessingTimeWindows.of(Time.seconds(120), Time.milliseconds(1000)))
            .sum(2);

    DataStream<Long> output = outputStream.keyBy(0).flatMap(new CountEvent());

    output.writeAsText("/tmp/flink_out", FileSystem.WriteMode.OVERWRITE);

    // execute program
    env.execute("Filter Transformation Example");

  }


  public static class CreateTuple
      implements MapFunction<MerchantApiEvent, Tuple3<String, String, Long>> {
    @Override
    public Tuple3<String, String, Long> map(MerchantApiEvent input) throws Exception {
      return new Tuple3<String, String, Long>(input.getMerchantId(), input.getApiName(), 1L);
    }

  }

  public static class CountEvent extends RichFlatMapFunction<Tuple3<String, String, Long>, Long> {

    private transient MapState<String, Long> mapState;

    @Override
    public void flatMap(Tuple3<String, String, Long> input, Collector<Long> out) throws Exception {

      mapState.put(input.f1, input.f2);

    }

    @Override
    public void open(Configuration config) {

      MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<String, Long>(
          "mapQuery", TypeInformation.of(new TypeHint<String>() {
          }), TypeInformation.of(new TypeHint<Long>() {
          }));
      mapStateDesc.setQueryable("mapQuery");

      mapState = getRuntimeContext().getMapState(mapStateDesc);

    }
  }


Flink Query Client's Logic

    final JobID jobId = JobID.fromHexString(jobIdParam);

    String key = queryStateRequestDto.getKey();

    final Configuration config = new Configuration();
    config.setString(JobManagerOptions.ADDRESS, jobManagerHost);
    config.setInteger(JobManagerOptions.PORT, jobManagerPort);

    HighAvailabilityServices highAvailabilityServices = null;
    try {
      highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
          config, Executors.newSingleThreadScheduledExecutor(),
          HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }

    try {
      QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);

      final TypeSerializer<String> keySerializer = TypeInformation.of(new TypeHint<String>() {
      }).createSerializer(new ExecutionConfig());
      final TypeSerializer<Map<String, Long>> valueSerializer =
          TypeInformation.of(new TypeHint<Map<String, Long>>() {
          }).createSerializer(new ExecutionConfig());

      final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace(key,
          keySerializer, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);

      scala.concurrent.Future<byte[]> serializedResult =
          client.getKvState(jobId, "mapQuery", key.hashCode(), serializedKey);

      // now wait for the result and return it
      final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
      byte[] serializedValue = Await.result(serializedResult, duration);
      Map<String, Long> value =
          KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
      System.out.println(value);
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }