git.net

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...


GitHub user onlyMIT reopened a pull request:

    https://github.com/apache/activemq-artemis/pull/2466

    ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cleâ?¦

    ### Test environment
    
    1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on one server to test Artemis, set the 'cleanSession' property to trueï¼?
    2. MQTT client: paho 1.2.0;
    3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T
    
    ### Issue
    
    **Issue 1**
    Artemis broker has the following exception log:
    `[Thread-0 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)] 17:27:59,035 WARN  [org.apache.activemq.artemis.utils.actors.OrderedExecutor] null: java.lang.NullPointerException
    	at org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182) [:]
    	at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150) [:]
    	at org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37) [:]
    	at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147) [:]
    	at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561) [:]
    	at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542) [:]
    	at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858) [:]
    	at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83) [:]
    	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:]
    	at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:]
    	at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_101]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_101]
    at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:]`
    
    **Issue 2**
    After closing all client connections, 64 queues were not cleaned upã??
    
    ### Analysis and simulation reproduction
    
      When the MQTT consumer client (cleanSession property set to true) reconnected,There is a certain probability that the queue will not be automatically cleared and a NullPointerException will be thrown.
      This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when the Artemis broker to start processing a â??new MQTT connectionâ?? while closing the â??old MQTT connectionâ??.
      Create an MQTT consumer (cleanSession: true, clientID: superConsumer, topic: mit.test) and connect to the Artemis broker. Create another MQTT consumer to set the same cleanSession, clientID, and topic, then start connecting with the Artemis broker. Close the two MQTT connections, and so many times after repeated trials, there is a probability to reproduce the two problems mentioned above.
    
    ### Solution
    
    **Issue 1**
    
      When 'session.getProtocolManager().isClientConnected(clientId, session.getConnection())' is called, if the 'MQTTConnection' instance retrieved from 'connectedClients' is 'null', a NullPointerException is thrown. Add a non-null decision in the 'MQTTProtocolManager.isClientConnected' method.
    
    **Issue 2** 
    
    1. Remove â??InterruptedExceptionâ?? from the â??MQTTConnectionManager.getSessionStateâ?? method because the â??InterruptedExceptionâ?? exception will never be thrown in this method;
    2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' methods add 'synchronized' with the MQTTSessionState instance as a lock.In the Artemis broker, all MQTT connections using the same clientId share the same MQTTSessionState instance. After adding this lock, you can avoid calling the 'connect' and 'disconnect' methods on the MQTT connections with the same clientId.
    3. For the MQTT protocol, there is one and only one consumer connection per queue, which is a good choice for closing the old MQTT consumer before the new MQTT consumer connects.
        The original code could not effectively clean up the 'old consumer' in the queue when the 'new MQTT connection' was connected to the Artemis broker. Modify â??MQTTSubscriptionManager.removeSubscriptionâ?? to get the queue consumer collection from the â??QueueImplâ?? instance and close them.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/onlyMIT/activemq-artemis onlyMIT-artemis-2.7.0

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/2466.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2466
    
----
commit 8c516ff88240470bf6dece4816c7fd616d6a60de
Author: onlyMIT <liangshipin_g@...>
Date:   2018-12-19T08:26:12Z

    ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cleared, and caused Artemis broker to throw a NullPointerException.
    
    When the MQTT consumer client (cleanSession property set to true) reconnected, there are certain probabilities that these two bugs will occur.
    This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when new and old connections occur while operating the same queue for unsafe behavior.

----


---