diff --git a/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java b/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java index 693f4768e1cf7ed28f067b19c31208dc8d782ec0..b2503b74028128dab4e706e8a226a6efe90968ce 100644 --- a/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java +++ b/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java @@ -20,7 +20,9 @@ import org.briarproject.api.sync.ValidationManager; import org.briarproject.api.sync.ValidationManager.IncomingMessageHook; import org.briarproject.util.ByteUtils; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map.Entry; import java.util.TreeMap; import java.util.logging.Logger; @@ -207,15 +209,21 @@ class MessageQueueManagerImpl implements MessageQueueManager { queueState.pending.put(queuePosition, m.getId()); saveQueueState(txn, m.getGroupId(), queueState); } else { - // The message is in order, pass it to the delegate + // The message is in order LOG.info("Message is in order, delivering"); QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(), m.getTimestamp(), queuePosition, m.getRaw()); - delegate.incomingMessage(txn, q, meta); queueState.incomingPosition++; - // Pass any consecutive messages to the delegate - MessageId id; - while ((id = queueState.popIncomingMessageId()) != null) { + // Collect any consecutive messages + List<MessageId> consecutive = new ArrayList<MessageId>(); + MessageId next; + while ((next = queueState.popIncomingMessageId()) != null) + consecutive.add(next); + // Save the queue state before passing control to the delegate + saveQueueState(txn, m.getGroupId(), queueState); + // Deliver the messages to the delegate + delegate.incomingMessage(txn, q, meta); + for (MessageId id : consecutive) { byte[] raw = db.getRawMessage(txn, id); meta = db.getMessageMetadata(txn, id); q = queueMessageFactory.createMessage(id, raw); @@ -225,7 +233,6 @@ class MessageQueueManagerImpl implements MessageQueueManager { } delegate.incomingMessage(txn, q, meta); } - saveQueueState(txn, m.getGroupId(), queueState); } } } diff --git a/briar-tests/src/org/briarproject/clients/MessageQueueManagerImplTest.java b/briar-tests/src/org/briarproject/clients/MessageQueueManagerImplTest.java index cf816fd88e4614430de2bc34a99a56a08b6918b0..05ca7fa32e1f21ed1940315b58171e8e2805243e 100644 --- a/briar-tests/src/org/briarproject/clients/MessageQueueManagerImplTest.java +++ b/briar-tests/src/org/briarproject/clients/MessageQueueManagerImplTest.java @@ -56,6 +56,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { context.mock(QueueMessageFactory.class); final ValidationManager validationManager = context.mock(ValidationManager.class); + final Transaction txn = new Transaction(null, false); final byte[] body = new byte[123]; final Metadata groupMetadata = new Metadata(); @@ -63,6 +64,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { final Metadata groupMetadata1 = new Metadata(); final byte[] queueState = new byte[123]; groupMetadata1.put(QUEUE_STATE_KEY, queueState); + context.checking(new Expectations() {{ // First message: queue state does not exist oneOf(db).getGroupMetadata(txn, groupId); @@ -123,6 +125,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { context.mock(QueueMessageFactory.class); final ValidationManager validationManager = context.mock(ValidationManager.class); + final AtomicReference<MessageValidator> captured = new AtomicReference<MessageValidator>(); final QueueMessageValidator queueMessageValidator = @@ -131,6 +134,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { final MessageId messageId = new MessageId(TestUtils.getRandomId()); final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH - 1]; final Message message = new Message(messageId, groupId, timestamp, raw); + context.checking(new Expectations() {{ oneOf(validationManager).registerMessageValidator(with(clientId), with(any(MessageValidator.class))); @@ -138,7 +142,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase { MessageValidator.class, 1)); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager); @@ -161,6 +164,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { context.mock(QueueMessageFactory.class); final ValidationManager validationManager = context.mock(ValidationManager.class); + final AtomicReference<MessageValidator> captured = new AtomicReference<MessageValidator>(); final QueueMessageValidator queueMessageValidator = @@ -171,6 +175,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { for (int i = 0; i < 8; i++) raw[MESSAGE_HEADER_LENGTH + i] = (byte) 0xFF; final Message message = new Message(messageId, groupId, timestamp, raw); + context.checking(new Expectations() {{ oneOf(validationManager).registerMessageValidator(with(clientId), with(any(MessageValidator.class))); @@ -178,7 +183,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase { MessageValidator.class, 1)); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager); @@ -201,6 +205,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { context.mock(QueueMessageFactory.class); final ValidationManager validationManager = context.mock(ValidationManager.class); + final AtomicReference<MessageValidator> captured = new AtomicReference<MessageValidator>(); final QueueMessageValidator queueMessageValidator = @@ -210,6 +215,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { final MessageId messageId = new MessageId(TestUtils.getRandomId()); final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; final Message message = new Message(messageId, groupId, timestamp, raw); + context.checking(new Expectations() {{ oneOf(validationManager).registerMessageValidator(with(clientId), with(any(MessageValidator.class))); @@ -221,7 +227,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase { will(returnValue(messageMetadata)); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager); @@ -249,6 +254,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { new AtomicReference<IncomingMessageHook>(); final IncomingQueueMessageHook incomingQueueMessageHook = context.mock(IncomingQueueMessageHook.class); + final Transaction txn = new Transaction(null, false); final Metadata groupMetadata = new Metadata(); final byte[] queueState = new byte[123]; @@ -257,6 +263,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { final MessageId messageId = new MessageId(TestUtils.getRandomId()); final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; final Message message = new Message(messageId, groupId, timestamp, raw); + context.checking(new Expectations() {{ oneOf(validationManager).registerIncomingMessageHook(with(clientId), with(any(IncomingMessageHook.class))); @@ -272,7 +279,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase { oneOf(db).deleteMessageMetadata(txn, messageId); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager); @@ -300,6 +306,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { new AtomicReference<IncomingMessageHook>(); final IncomingQueueMessageHook incomingQueueMessageHook = context.mock(IncomingQueueMessageHook.class); + final Transaction txn = new Transaction(null, false); final Metadata groupMetadata = new Metadata(); final byte[] queueState = new byte[123]; @@ -310,6 +317,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { ByteUtils.writeUint64(1L, raw, MESSAGE_HEADER_LENGTH); final Message message = new Message(messageId, groupId, timestamp, raw); final BdfList pending = BdfList.of(BdfList.of(1L, messageId)); + context.checking(new Expectations() {{ oneOf(validationManager).registerIncomingMessageHook(with(clientId), with(any(IncomingMessageHook.class))); @@ -327,7 +335,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase { with(any(Metadata.class))); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager); @@ -355,6 +362,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { new AtomicReference<IncomingMessageHook>(); final IncomingQueueMessageHook incomingQueueMessageHook = context.mock(IncomingQueueMessageHook.class); + final Transaction txn = new Transaction(null, false); final Metadata groupMetadata = new Metadata(); final byte[] queueState = new byte[123]; @@ -364,6 +372,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; final Message message = new Message(messageId, groupId, timestamp, raw); final Metadata messageMetadata = new Metadata(); + context.checking(new Expectations() {{ oneOf(validationManager).registerIncomingMessageHook(with(clientId), with(any(IncomingMessageHook.class))); @@ -374,17 +383,16 @@ public class MessageQueueManagerImplTest extends BriarTestCase { // Queue position 0 is expected oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); will(new DecodeQueueStateAction(0L, 0L, new BdfList())); - // The message should be delegated - oneOf(incomingQueueMessageHook).incomingMessage(with(txn), - with(any(QueueMessage.class)), with(messageMetadata)); // Queue position 1 should be expected next oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); will(new EncodeQueueStateAction(0L, 1L, new BdfList())); oneOf(db).mergeGroupMetadata(with(txn), with(groupId), with(any(Metadata.class))); + // The message should be delegated + oneOf(incomingQueueMessageHook).incomingMessage(with(txn), + with(any(QueueMessage.class)), with(messageMetadata)); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager); @@ -412,6 +420,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { new AtomicReference<IncomingMessageHook>(); final IncomingQueueMessageHook incomingQueueMessageHook = context.mock(IncomingQueueMessageHook.class); + final Transaction txn = new Transaction(null, false); final Metadata groupMetadata = new Metadata(); final byte[] queueState = new byte[123]; @@ -428,6 +437,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase { timestamp, 1L, raw1); final Metadata messageMetadata1 = new Metadata(); final BdfList pending = BdfList.of(BdfList.of(1L, messageId1)); + context.checking(new Expectations() {{ oneOf(validationManager).registerIncomingMessageHook(with(clientId), with(any(IncomingMessageHook.class))); @@ -438,7 +448,12 @@ public class MessageQueueManagerImplTest extends BriarTestCase { // Queue position 0 is expected, position 1 is pending oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); will(new DecodeQueueStateAction(0L, 0L, pending)); - // The message should be delegated + // Queue position 2 should be expected next + oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); + will(new EncodeQueueStateAction(0L, 2L, new BdfList())); + oneOf(db).mergeGroupMetadata(with(txn), with(groupId), + with(any(Metadata.class))); + // The new message should be delegated oneOf(incomingQueueMessageHook).incomingMessage(with(txn), with(any(QueueMessage.class)), with(messageMetadata)); // The pending message should be retrieved @@ -451,14 +466,8 @@ public class MessageQueueManagerImplTest extends BriarTestCase { // The pending message should be delegated oneOf(incomingQueueMessageHook).incomingMessage(txn, message1, messageMetadata1); - // Queue position 2 should be expected next - oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); - will(new EncodeQueueStateAction(0L, 2L, new BdfList())); - oneOf(db).mergeGroupMetadata(with(txn), with(groupId), - with(any(Metadata.class))); }}); - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, clientHelper, queueMessageFactory, validationManager);