Flink RMQSource Consumer: How I get the RabbitMQ UserId

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink RMQSource Consumer: How I get the RabbitMQ UserId

Marke Builder
Hi,

how I can get the UserId from the Properties in my DataStream?

I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();

But how can I provide this to my DataStream ?

Best regards,
Martin
Reply | Threaded
Open this post in threaded view
|

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

vino yang
Hi Marke,

As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. 
Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream 
to represent the userId, then override the RMQSource#run method and extract the userId from the properties of Delivery. 
Of course, in addition, maybe you Need to pay attention to the implementation of DeserializationSchema.

Thanks, vino.

Marke Builder <[hidden email]> 于2018年9月8日周六 下午3:44写道:
Hi,

how I can get the UserId from the Properties in my DataStream?

I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();

But how can I provide this to my DataStream ?

Best regards,
Martin
Reply | Threaded
Open this post in threaded view
|

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

vino yang
Hi Marke,

As soon as I didn't really implement this code, but I think you can replace this line of code:

OUT result = schema.deserialize(delivery.getBody());        //RMQSource#run

instead of defining an abstract method in RMQSource, such as: normalize/deserialize, the input parameter is Delivery, 
and the output parameter is generic type <Out> and implement your custom logic in this method.

Thanks, vino.

Marke Builder <[hidden email]> 于2018年9月10日周一 上午12:32写道:
Hi Vino,

many thanks for your response, the solution works! But I have one additional question,
What is the best way to override the RMQSource#run without access to the RMQSource variable "running" ?

Thanks, Martin.

Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang <[hidden email]>:
Hi Marke,

As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. 
Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream 
to represent the userId, then override the RMQSource#run method and extract the userId from the properties of Delivery. 
Of course, in addition, maybe you Need to pay attention to the implementation of DeserializationSchema.

Thanks, vino.

Marke Builder <[hidden email]> 于2018年9月8日周六 下午3:44写道:
Hi,

how I can get the UserId from the Properties in my DataStream?

I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();

But how can I provide this to my DataStream ?

Best regards,
Martin
Reply | Threaded
Open this post in threaded view
|

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

vino yang
Hi Marke,

Should not use the code like this :

delivery.getProperties().getUserId();

to get the userId from Delivery object?

And for second code example, Since you got the object of TimeSeriesType type, should not define DataStream<TimeSeriesType> instead of DataStream<String>.

Regarding userId, I just said that this is a way of extracting. But if it doesn't have a value in itself, then there is no way to get it. Can you confirm that the message itself has value in RabbitMQ?

Thanks, vino.

Marke Builder <[hidden email]> 于2018年9月11日周二 下午4:34写道:
Hi Vino,

this is what I done, but no user Id available. And the first question was about the running parameter in RMQSource#boolean running.

Code example:
@Override 
run(SourceContext cts) {
....
TimeSeriesType result = (TimeSeriesType) schema.deserialize(delivery.getBody()); 
.....
final String userId = delivery.getProperties().
result.setDeviceId(userId);
......
ctx.collect(result);


And the DataStream looks like this:
final DataStream<String> stream = env
                .addSource(new RabbitmqStreamProcessorV2(
                        connectionConfig,
                        fastDataQueue,
                        new AbstractDeserializationSchema<TimeSeriesType>() {
                            @Override
                            public TimeSeriesType deserialize(byte[] bytes) throws IOException {
                                TimeSeriesType message = null;
                                try {
                                     message = xmlParser.parse(new String(bytes, "UTF8"));
                                     logger.info("Data/Message size: " +String.valueOf(message.getData().size()));
                                } catch (JAXBException e) {
                                    e.printStackTrace();
                                    logger.log(Level.INFO, e.toString());
                                }
                                return message;
                            }
                        }))
                .flatMap(.....






Am Mo., 10. Sep. 2018 um 03:52 Uhr schrieb vino yang <[hidden email]>:
Hi Marke,

As soon as I didn't really implement this code, but I think you can replace this line of code:

OUT result = schema.deserialize(delivery.getBody());        //RMQSource#run

instead of defining an abstract method in RMQSource, such as: normalize/deserialize, the input parameter is Delivery, 
and the output parameter is generic type <Out> and implement your custom logic in this method.

Thanks, vino.

Marke Builder <[hidden email]> 于2018年9月10日周一 上午12:32写道:
Hi Vino,

many thanks for your response, the solution works! But I have one additional question,
What is the best way to override the RMQSource#run without access to the RMQSource variable "running" ?

Thanks, Martin.

Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang <[hidden email]>:
Hi Marke,

As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. 
Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream 
to represent the userId, then override the RMQSource#run method and extract the userId from the properties of Delivery. 
Of course, in addition, maybe you Need to pay attention to the implementation of DeserializationSchema.

Thanks, vino.

Marke Builder <[hidden email]> 于2018年9月8日周六 下午3:44写道:
Hi,

how I can get the UserId from the Properties in my DataStream?

I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();

But how can I provide this to my DataStream ?

Best regards,
Martin