git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId


Hi Marke,

Should not use the code like this :

delivery.getProperties().getUserId();

to get the userId from Delivery object?

And for second code example, Since you got the object of TimeSeriesType type, should not define DataStream<TimeSeriesType> instead of DataStream<String>.

Regarding userId, I just said that this is a way of extracting. But if it doesn't have a value in itself, then there is no way to get it. Can you confirm that the message itself has value in RabbitMQ?

Thanks, vino.

Marke Builder <marke.builder@xxxxxxxxx> 于2018年9月11日周二 下午4:34写道:
Hi Vino,

this is what I done, but no user Id available. And the first question was about the running parameter in RMQSource#boolean running.

Code example:
@Override 
run(SourceContext cts) {
....
TimeSeriesType result = (TimeSeriesType) schema.deserialize(delivery.getBody()); 
.....
final String userId = delivery.getProperties().
result.setDeviceId(userId);
......
ctx.collect(result);


And the DataStream looks like this:
final DataStream<String> stream = env
                .addSource(new RabbitmqStreamProcessorV2(
                        connectionConfig,
                        fastDataQueue,
                        new AbstractDeserializationSchema<TimeSeriesType>() {
                            @Override
                            public TimeSeriesType deserialize(byte[] bytes) throws IOException {
                                TimeSeriesType message = null;
                                try {
                                     message = xmlParser.parse(new String(bytes, "UTF8"));
                                     logger.info("Data/Message size: " +String.valueOf(message.getData().size()));
                                } catch (JAXBException e) {
                                    e.printStackTrace();
                                    logger.log(Level.INFO, e.toString());
                                }
                                return message;
                            }
                        }))
                .flatMap(.....






Am Mo., 10. Sep. 2018 um 03:52 Uhr schrieb vino yang <yanghua1127@xxxxxxxxx>:
Hi Marke,

As soon as I didn't really implement this code, but I think you can replace this line of code:

OUT result = schema.deserialize(delivery.getBody());        //RMQSource#run

instead of defining an abstract method in RMQSource, such as: normalize/deserialize, the input parameter is Delivery, 
and the output parameter is generic type <Out> and implement your custom logic in this method.

Thanks, vino.

Marke Builder <marke.builder@xxxxxxxxx> 于2018年9月10日周一 上午12:32写道:
Hi Vino,

many thanks for your response, the solution works! But I have one additional question,
What is the best way to override the RMQSource#run without access to the RMQSource variable "running" ?

Thanks, Martin.

Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang <yanghua1127@xxxxxxxxx>:
Hi Marke,

As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. 
Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream 
to represent the userId, then override the RMQSource#run method and extract the userId from the properties of Delivery. 
Of course, in addition, maybe you Need to pay attention to the implementation of DeserializationSchema.

Thanks, vino.

Marke Builder <marke.builder@xxxxxxxxx> 于2018年9月8日周六 下午3:44写道:
Hi,

how I can get the UserId from the Properties in my DataStream?

I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();

But how can I provide this to my DataStream ?

Best regards,
Martin