Skip to content
Snippets Groups Projects
Unverified Commit 89d25d35 authored by akwizgran's avatar akwizgran
Browse files

Save queue state before delivering message. #272

parent eeaa7e38
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,9 @@ import org.briarproject.api.sync.ValidationManager;
import org.briarproject.api.sync.ValidationManager.IncomingMessageHook;
import org.briarproject.util.ByteUtils;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.logging.Logger;
......@@ -207,15 +209,21 @@ class MessageQueueManagerImpl implements MessageQueueManager {
queueState.pending.put(queuePosition, m.getId());
saveQueueState(txn, m.getGroupId(), queueState);
} else {
// The message is in order, pass it to the delegate
// The message is in order
LOG.info("Message is in order, delivering");
QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(),
m.getTimestamp(), queuePosition, m.getRaw());
delegate.incomingMessage(txn, q, meta);
queueState.incomingPosition++;
// Pass any consecutive messages to the delegate
MessageId id;
while ((id = queueState.popIncomingMessageId()) != null) {
// Collect any consecutive messages
List<MessageId> consecutive = new ArrayList<MessageId>();
MessageId next;
while ((next = queueState.popIncomingMessageId()) != null)
consecutive.add(next);
// Save the queue state before passing control to the delegate
saveQueueState(txn, m.getGroupId(), queueState);
// Deliver the messages to the delegate
delegate.incomingMessage(txn, q, meta);
for (MessageId id : consecutive) {
byte[] raw = db.getRawMessage(txn, id);
meta = db.getMessageMetadata(txn, id);
q = queueMessageFactory.createMessage(id, raw);
......@@ -225,7 +233,6 @@ class MessageQueueManagerImpl implements MessageQueueManager {
}
delegate.incomingMessage(txn, q, meta);
}
saveQueueState(txn, m.getGroupId(), queueState);
}
}
}
......
......@@ -56,6 +56,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class);
final ValidationManager validationManager =
context.mock(ValidationManager.class);
final Transaction txn = new Transaction(null, false);
final byte[] body = new byte[123];
final Metadata groupMetadata = new Metadata();
......@@ -63,6 +64,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final Metadata groupMetadata1 = new Metadata();
final byte[] queueState = new byte[123];
groupMetadata1.put(QUEUE_STATE_KEY, queueState);
context.checking(new Expectations() {{
// First message: queue state does not exist
oneOf(db).getGroupMetadata(txn, groupId);
......@@ -123,6 +125,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class);
final ValidationManager validationManager =
context.mock(ValidationManager.class);
final AtomicReference<MessageValidator> captured =
new AtomicReference<MessageValidator>();
final QueueMessageValidator queueMessageValidator =
......@@ -131,6 +134,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final MessageId messageId = new MessageId(TestUtils.getRandomId());
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH - 1];
final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{
oneOf(validationManager).registerMessageValidator(with(clientId),
with(any(MessageValidator.class)));
......@@ -138,7 +142,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
MessageValidator.class, 1));
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......@@ -161,6 +164,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class);
final ValidationManager validationManager =
context.mock(ValidationManager.class);
final AtomicReference<MessageValidator> captured =
new AtomicReference<MessageValidator>();
final QueueMessageValidator queueMessageValidator =
......@@ -171,6 +175,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
for (int i = 0; i < 8; i++)
raw[MESSAGE_HEADER_LENGTH + i] = (byte) 0xFF;
final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{
oneOf(validationManager).registerMessageValidator(with(clientId),
with(any(MessageValidator.class)));
......@@ -178,7 +183,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
MessageValidator.class, 1));
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......@@ -201,6 +205,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
context.mock(QueueMessageFactory.class);
final ValidationManager validationManager =
context.mock(ValidationManager.class);
final AtomicReference<MessageValidator> captured =
new AtomicReference<MessageValidator>();
final QueueMessageValidator queueMessageValidator =
......@@ -210,6 +215,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final MessageId messageId = new MessageId(TestUtils.getRandomId());
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{
oneOf(validationManager).registerMessageValidator(with(clientId),
with(any(MessageValidator.class)));
......@@ -221,7 +227,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
will(returnValue(messageMetadata));
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......@@ -249,6 +254,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123];
......@@ -257,6 +263,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final MessageId messageId = new MessageId(TestUtils.getRandomId());
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
final Message message = new Message(messageId, groupId, timestamp, raw);
context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class)));
......@@ -272,7 +279,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
oneOf(db).deleteMessageMetadata(txn, messageId);
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......@@ -300,6 +306,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123];
......@@ -310,6 +317,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
ByteUtils.writeUint64(1L, raw, MESSAGE_HEADER_LENGTH);
final Message message = new Message(messageId, groupId, timestamp, raw);
final BdfList pending = BdfList.of(BdfList.of(1L, messageId));
context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class)));
......@@ -327,7 +335,6 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
with(any(Metadata.class)));
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......@@ -355,6 +362,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123];
......@@ -364,6 +372,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
final byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH];
final Message message = new Message(messageId, groupId, timestamp, raw);
final Metadata messageMetadata = new Metadata();
context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class)));
......@@ -374,17 +383,16 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
// Queue position 0 is expected
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
will(new DecodeQueueStateAction(0L, 0L, new BdfList()));
// The message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
with(any(QueueMessage.class)), with(messageMetadata));
// Queue position 1 should be expected next
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
will(new EncodeQueueStateAction(0L, 1L, new BdfList()));
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
with(any(Metadata.class)));
// The message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
with(any(QueueMessage.class)), with(messageMetadata));
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......@@ -412,6 +420,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
new AtomicReference<IncomingMessageHook>();
final IncomingQueueMessageHook incomingQueueMessageHook =
context.mock(IncomingQueueMessageHook.class);
final Transaction txn = new Transaction(null, false);
final Metadata groupMetadata = new Metadata();
final byte[] queueState = new byte[123];
......@@ -428,6 +437,7 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
timestamp, 1L, raw1);
final Metadata messageMetadata1 = new Metadata();
final BdfList pending = BdfList.of(BdfList.of(1L, messageId1));
context.checking(new Expectations() {{
oneOf(validationManager).registerIncomingMessageHook(with(clientId),
with(any(IncomingMessageHook.class)));
......@@ -438,7 +448,12 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
// Queue position 0 is expected, position 1 is pending
oneOf(clientHelper).toDictionary(queueState, 0, queueState.length);
will(new DecodeQueueStateAction(0L, 0L, pending));
// The message should be delegated
// Queue position 2 should be expected next
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
will(new EncodeQueueStateAction(0L, 2L, new BdfList()));
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
with(any(Metadata.class)));
// The new message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(with(txn),
with(any(QueueMessage.class)), with(messageMetadata));
// The pending message should be retrieved
......@@ -451,14 +466,8 @@ public class MessageQueueManagerImplTest extends BriarTestCase {
// The pending message should be delegated
oneOf(incomingQueueMessageHook).incomingMessage(txn, message1,
messageMetadata1);
// Queue position 2 should be expected next
oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class)));
will(new EncodeQueueStateAction(0L, 2L, new BdfList()));
oneOf(db).mergeGroupMetadata(with(txn), with(groupId),
with(any(Metadata.class)));
}});
MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db,
clientHelper, queueMessageFactory, validationManager);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment