Conversation
|
As @tabish121 pointed out, the current implementation violates the spec I think. I think the best way to implement this is by creating a ActiveMQMessageConsumer#receiveBody that ActiveMQConsumer can delegate to. Currently, ActiveMQConsumer first delegates to ActiveMQMessageConsumer that will ack the message before reading the body. But we need to read first the body for receiveBody. |
|
@jeanouii thanks for the review. I have to resume my work on this PR. I agree that the approach you are proposing makes more sense (else the ack will be send too early). I will update accordingly. @tabish121 @jeanouii thanks for your inputs ! I will resume my work on this one. |
|
I updated this PR with a new implementation approach. |
cshannon
left a comment
There was a problem hiding this comment.
I only glanced at this and haven't reviewed the rest of the changes, but one thing that immedaitely stood out that needs to be changed is this completely breaks exception handling. JMSConsumer needs to be converted to a runtime exception but ActiveMQConsumer still implements MessageConsumer. This is a problem because anyone who is using that API may be using a try/catch and looking for JMSException that will now no longer be thrown so it's a major breaking change if client applications are catching checked exceptions that will never be thrown so we can't do that.
I think you need to do something like the approach Artemis did, where they wrap the MessageConsumer instead. See https://github.com/apache/artemis/blob/2e390b4fa0842ed8d3557a158c9ca4d204687cf3/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQJMSConsumer.java
|
@cshannon yup, it's what I realized while working on the PR. I will update. |
activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Outdated
Show resolved
Hide resolved
1b6c641 to
48f254d
Compare
|
@cshannon I updated this PR by using the existing |
|
@tabish121 @cshannon @jeanouii I reworked this PR and added specific unit tests for |
| */ | ||
| private <T> T doReceiveBody(MessageDispatch md, Class<T> c) throws JMSException { | ||
| ActiveMQMessage message = createActiveMQMessage(md); | ||
| if (!message.isBodyAssignableTo(c)) { |
There was a problem hiding this comment.
I think I've pointed this out already a couple times but this still violates the specification. Please refer to section 8.6 on pages 84 and 85 of the Jakarta Messaging 3.1 specification for how the client should handle this in terms of the ACK mode of the consumer.
The basic concept is as follows:
- If the client acknowledgement mode is AUTO or DUPS_OK then you are meant to put the message back on the prefetch queue and allow the caller to read it again as they have no recourse to recover those messages once you throw here.
- If in CLIENT acknowledgement mode the message should not immediately go back to the prefetch buffer as the application can either call
session.recover()to redeliver all unacknowledged messages or can read the next message and callmessage.acknowledge()to acknowledge that message and all previous messages. - If in TRANSACTED mode the message should not go back into the prefetch buffer because the application can either call
session.rollback()to recover messages that failed or cal simply continue consuming messages and callsession.commit()to retire this and any other failed or consumed messages.
Testing here should account for these cases and also ensure that in CLIENT and TRANSACTED modes that the consumer credit window is expanded when message fail here to avoid a stuck consumer receive due to the current credit window having been exhausted.
There was a problem hiding this comment.
Thanks @tabish121 , I forgot this part in my latest change. I was focusing too much on beforeMessageConsumed() and afterMessageConsumed() methods. My bad.
Let me update the doReceiveBody() to behave correctly depending of the ack mode.
There was a problem hiding this comment.
@tabish121 @cshannon So, I did a new read on the spec.
Here's my take/understanding.
- If the ack mode is AUTO or DUPS_OK, the spec says "behave as if the unsuccessful call has not occurred. The message will be delivered again before any subsequent messages. This is not considered to be redelivery and does not cause JMSRedelivered to be set or JMSXDeliveryCount to be incremented." My understanding here is that we should use
unconsumedMessages.enqueueFirst(md), meaning that we put back to front of local prefetch, no broker interaction, no header changes. - If the ack mode is CLIENT, the spec says "behave as if the call has been successful and will not deliver the message again. The message will not be acknowledged until acknowledge is called. If an application wishes redelivery, it must call recover." My understanding here is that we should use
beforeMessageIsConsumed(md)andafterMessageIsConsumed(md, false), meaning tracked as delivered with credit window expansion. - if Transacted, the spec says "behave as if the call had been successful and will not deliver the message again. The transaction will remain uncommitted. If an application wishes redelivery, it mull roll back the transaction." Here, my understand is that we should use the same approach as CLIENT ack mode (delivered within transaction).
Also, I see in the spec that the message type has an "impact".
If the message doesn't have body, we throw the exception.
The spec says "may be used to receive any type of message except for StreamMessage", meaning that ActiveMQStreamMessage.isBodyAssignableTo() always return false.
The spec says "may be used to receive any type of message except for... Message", meaning we need an explicit check for ACTIVEMQ_MESSAGE data structure type.
Am I right ?
There was a problem hiding this comment.
You should implement isBodyAssignableTo in each message type to follow the guidance in the API docs.
boolean isBodyAssignableTo([Class](https://docs.oracle.com/javase/8/docs/api/java/lang/Class.html?is-external=true) c)
throws [JMSException](https://jakarta.ee/specifications/messaging/2.0/apidocs/javax/jms/jmsexception)
Returns whether the message body is capable of being assigned to the specified type. If this method returns true then a subsequent call to the method getBody on the same message with the same type argument would not throw a MessageFormatException.
If the message is a StreamMessage then false is always returned. If the message is a ObjectMessage and object deserialization fails then false is returned. If the message has no body then any type may be specified and true is returned.
Parameters:
c - The specified type
If the message is a TextMessage then this method will only return true if this parameter is set to String.class or another type to which a String is assignable.
If the message is a ObjectMessage then this method will only return true if this parameter is set to java.io.Serializable.class or another class to which the body is assignable.
If the message is a MapMessage then this method will only return true if this parameter is set to java.util.Map.class (or java.lang.Object.class).
If the message is a BytesMessage then this this method will only return true if this parameter is set to byte[].class (or java.lang.Object.class).
If the message is a TextMessage, ObjectMessage, MapMessage or BytesMessage and the message has no body, then the above does not apply and this method will return true irrespective of the value of this parameter.
If the message is a Message (but not one of its subtypes) then this method will return true irrespective of the value of this parameter.
There was a problem hiding this comment.
OK, I added a try/catch on getObject() to return false in isBodyAssignableTo() to be compliant with the spec.
There was a problem hiding this comment.
You are correct about the base Message type needing to be accounted for. It is the only case where you have an explicitly defined Message that has "no body" whereas the others might arrive without a body but a null body is itself a body in those cases so the method can return null.
…AssignableTo() Per the JMS spec, isBodyAssignableTo should return false when object deserialization fails rather than propagating the exception.
|
LGTM Type validation prior to consumption - enqueue first if not good. Better approach than consuming and catching exception because then you would need remove from deliveredMessages when AUTO_ACK before doing the enqueue first. TRANSACTED/CLIENT_ACK call to before and after to manage the credit window. Avoid blocking the next receive. Test coverage looks good with edge cases for credit window expansion, transacted rollback, auto_ack redelivery, plain message. Good job @jbonofre |
No description provided.