[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

Hi Marke,

As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. 
Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream 
to represent the userId, then override the RMQSource#run method and extract the userId from the properties of Delivery. 
Of course, in addition, maybe you Need to pay attention to the implementation of DeserializationSchema.

Thanks, vino.

Marke Builder <marke.builder@xxxxxxxxx> 于2018年9月8日周六 下午3:44写道:

how I can get the UserId from the Properties in my DataStream?

I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();

But how can I provide this to my DataStream ?

Best regards,