Hi, how I can get the UserId from the Properties in my DataStream? I can read the userId if I extend the RMQSource Class: QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String userId = delivery.getProperties().getUserId(); But how can I provide this to my DataStream ? Best regards, Martin |
Hi Marke, As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream to represent the userId, then override the RMQSource#run method and extract the userId from the properties of Delivery. Of course, in addition, maybe you Need to pay attention to the implementation of DeserializationSchema. Thanks, vino. Marke Builder <[hidden email]> 于2018年9月8日周六 下午3:44写道:
|
Hi Marke, As soon as I didn't really implement this code, but I think you can replace this line of code: OUT result = schema.deserialize(delivery.getBody()); //RMQSource#run instead of defining an abstract method in RMQSource, such as: normalize/deserialize, the input parameter is Delivery, and the output parameter is generic type <Out> and implement your custom logic in this method. Thanks, vino. Marke Builder <[hidden email]> 于2018年9月10日周一 上午12:32写道:
|
Hi Marke, Should not use the code like this : delivery.getProperties().getUserId(); to get the userId from Delivery object? And for second code example, Since you got the object of TimeSeriesType type, should not define DataStream<TimeSeriesType> instead of DataStream<String>. Regarding userId, I just said that this is a way of extracting. But if it doesn't have a value in itself, then there is no way to get it. Can you confirm that the message itself has value in RabbitMQ? Thanks, vino. Marke Builder <[hidden email]> 于2018年9月11日周二 下午4:34写道:
|
Free forum by Nabble | Edit this page |