diff --git a/briar-android-tests/src/test/java/org/briarproject/ForumSharingIntegrationTest.java b/briar-android-tests/src/test/java/org/briarproject/ForumSharingIntegrationTest.java index b4a9c78db6b7cea9caee363f37ef01a63fd5d151..79809f444b7fa97999576616b48fa58182f7278e 100644 --- a/briar-android-tests/src/test/java/org/briarproject/ForumSharingIntegrationTest.java +++ b/briar-android-tests/src/test/java/org/briarproject/ForumSharingIntegrationTest.java @@ -27,9 +27,12 @@ import org.briarproject.api.identity.AuthorFactory; import org.briarproject.api.identity.IdentityManager; import org.briarproject.api.identity.LocalAuthor; import org.briarproject.api.lifecycle.LifecycleManager; +import org.briarproject.api.sync.ClientId; import org.briarproject.api.sync.Group; import org.briarproject.api.sync.SyncSession; import org.briarproject.api.sync.SyncSessionFactory; +import org.briarproject.api.sync.ValidationManager; +import org.briarproject.api.sync.ValidationManager.State; import org.briarproject.api.system.Clock; import org.briarproject.contact.ContactModule; import org.briarproject.crypto.CryptoModule; @@ -61,6 +64,7 @@ import static org.briarproject.api.forum.ForumConstants.FORUM_SALT_LENGTH; import static org.briarproject.api.forum.ForumConstants.SHARE_MSG_TYPE_INVITATION; import static org.briarproject.api.identity.AuthorConstants.MAX_PUBLIC_KEY_LENGTH; import static org.briarproject.api.sync.ValidationManager.State.DELIVERED; +import static org.briarproject.api.sync.ValidationManager.State.INVALID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -672,8 +676,10 @@ public class ForumSharingIntegrationTest extends BriarTestCase { public void eventOccurred(Event e) { if (e instanceof MessageStateChangedEvent) { MessageStateChangedEvent event = (MessageStateChangedEvent) e; - if (event.getState() == DELIVERED && event.getClientId() - .equals(forumSharingManager0.getClientId()) && + State s = event.getState(); + ClientId c = event.getClientId(); + if ((s == DELIVERED || s == INVALID) && + c.equals(forumSharingManager0.getClientId()) && !event.isLocal()) { LOG.info("TEST: Sharer received message in group " + event.getMessage().getGroupId().hashCode()); @@ -724,8 +730,10 @@ public class ForumSharingIntegrationTest extends BriarTestCase { public void eventOccurred(Event e) { if (e instanceof MessageStateChangedEvent) { MessageStateChangedEvent event = (MessageStateChangedEvent) e; - if (event.getState() == DELIVERED && event.getClientId() - .equals(forumSharingManager1.getClientId()) && + State s = event.getState(); + ClientId c = event.getClientId(); + if ((s == DELIVERED || s == INVALID) && + c.equals(forumSharingManager0.getClientId()) && !event.isLocal()) { LOG.info("TEST: Invitee received message in group " + event.getMessage().getGroupId().hashCode()); diff --git a/briar-android-tests/src/test/java/org/briarproject/IntroductionIntegrationTest.java b/briar-android-tests/src/test/java/org/briarproject/IntroductionIntegrationTest.java index 043fcae7d5774ddc218b3fef7ea772c207938236..52fad50bbfb83256fd3af44f4e9b6c4ed827d970 100644 --- a/briar-android-tests/src/test/java/org/briarproject/IntroductionIntegrationTest.java +++ b/briar-android-tests/src/test/java/org/briarproject/IntroductionIntegrationTest.java @@ -29,10 +29,13 @@ import org.briarproject.api.introduction.IntroductionRequest; import org.briarproject.api.lifecycle.LifecycleManager; import org.briarproject.api.properties.TransportProperties; import org.briarproject.api.properties.TransportPropertyManager; +import org.briarproject.api.sync.ClientId; import org.briarproject.api.sync.Group; import org.briarproject.api.sync.MessageId; import org.briarproject.api.sync.SyncSession; import org.briarproject.api.sync.SyncSessionFactory; +import org.briarproject.api.sync.ValidationManager; +import org.briarproject.api.sync.ValidationManager.State; import org.briarproject.api.system.Clock; import org.briarproject.contact.ContactModule; import org.briarproject.crypto.CryptoModule; @@ -71,6 +74,7 @@ import static org.briarproject.api.introduction.IntroductionConstants.SESSION_ID import static org.briarproject.api.introduction.IntroductionConstants.TYPE; import static org.briarproject.api.introduction.IntroductionConstants.TYPE_REQUEST; import static org.briarproject.api.sync.ValidationManager.State.DELIVERED; +import static org.briarproject.api.sync.ValidationManager.State.INVALID; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -1050,8 +1054,10 @@ public class IntroductionIntegrationTest extends BriarTestCase { public void eventOccurred(Event e) { if (e instanceof MessageStateChangedEvent) { MessageStateChangedEvent event = (MessageStateChangedEvent) e; - if (event.getState() == DELIVERED && event.getClientId() - .equals(introductionManager0.getClientId()) && + State s = event.getState(); + ClientId c = event.getClientId(); + if ((s == DELIVERED || s == INVALID) && + c.equals(introductionManager0.getClientId()) && !event.isLocal()) { LOG.info("TEST: Introducee" + introducee + " received message in group " + diff --git a/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java b/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java index 487c31d52c003ec854d808f32719648e4e52f3f2..f6aa74c3ab07fcd7bbb0ba5f211f027270980329 100644 --- a/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java +++ b/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java @@ -23,8 +23,10 @@ import org.briarproject.api.sync.ValidationManager; import org.briarproject.api.sync.MessageContext; import org.briarproject.util.ByteUtils; +import java.util.Collection; import java.util.LinkedList; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -36,7 +38,9 @@ import javax.inject.Inject; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; +import static org.briarproject.api.sync.ValidationManager.State.DELIVERED; import static org.briarproject.api.sync.ValidationManager.State.INVALID; +import static org.briarproject.api.sync.ValidationManager.State.PENDING; import static org.briarproject.api.sync.ValidationManager.State.VALID; class ValidationManagerImpl implements ValidationManager, Service, @@ -66,7 +70,10 @@ class ValidationManagerImpl implements ValidationManager, Service, @Override public void startService() { if (used.getAndSet(true)) throw new IllegalStateException(); - for (ClientId c : validators.keySet()) getMessagesToValidate(c); + for (ClientId c : validators.keySet()) { + validateOutstandingMessages(c); + deliverOutstandingMessages(c); + } } @Override @@ -84,7 +91,7 @@ class ValidationManagerImpl implements ValidationManager, Service, hooks.put(c, hook); } - private void getMessagesToValidate(final ClientId c) { + private void validateOutstandingMessages(final ClientId c) { dbExecutor.execute(new Runnable() { @Override public void run() { @@ -128,6 +135,7 @@ class ValidationManagerImpl implements ValidationManager, Service, LOG.info("Group removed before validation"); // Continue to next message } finally { + if (!txn.isComplete()) txn.setComplete(); db.endTransaction(txn); } if (m != null && g != null) validateMessage(m, g); @@ -140,6 +148,112 @@ class ValidationManagerImpl implements ValidationManager, Service, }); } + private void deliverOutstandingMessages(final ClientId c) { + dbExecutor.execute(new Runnable() { + @Override + public void run() { + try { + Queue<MessageId> validated = new LinkedList<MessageId>(); + Queue<MessageId> pending = new LinkedList<MessageId>(); + Transaction txn = db.startTransaction(true); + try { + validated.addAll(db.getMessagesToDeliver(txn, c)); + pending.addAll(db.getPendingMessages(txn, c)); + txn.setComplete(); + } finally { + db.endTransaction(txn); + } + deliverNextMessage(validated); + deliverNextPendingMessage(pending); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + } + } + }); + } + + private void deliverNextMessage(final Queue<MessageId> validated) { + if (validated.isEmpty()) return; + dbExecutor.execute(new Runnable() { + @Override + public void run() { + try { + Message m = null; + Group g = null; + Metadata meta = null; + Transaction txn = db.startTransaction(true); + try { + MessageId id = validated.poll(); + byte[] raw = db.getRawMessage(txn, id); + m = parseMessage(id, raw); + g = db.getGroup(txn, m.getGroupId()); + meta = db.getMessageMetadata(txn, id); + txn.setComplete(); + } finally { + db.endTransaction(txn); + } + if (g != null) deliverMessage(m, g.getClientId(), meta); + deliverNextMessage(validated); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + } + } + }); + } + + private void deliverNextPendingMessage(final Queue<MessageId> pending) { + if (pending.isEmpty()) return; + dbExecutor.execute(new Runnable() { + @Override + public void run() { + Message m = null; + ClientId c = null; + try { + boolean allDelivered = true; + Metadata meta = null; + Transaction txn = db.startTransaction(true); + try { + MessageId id = pending.poll(); + byte[] raw = db.getRawMessage(txn, id); + m = parseMessage(id, raw); + Group g = db.getGroup(txn, m.getGroupId()); + c = g.getClientId(); + + // check if a dependency is invalid + Map<MessageId, State> states = + db.getMessageDependencies(txn, id); + for (Entry<MessageId, State> d : states.entrySet()) { + if (d.getValue() == INVALID) { + throw new InvalidMessageException( + "Invalid Dependency"); + } + if (d.getValue() != DELIVERED) allDelivered = false; + } + if(allDelivered) { + meta = db.getMessageMetadata(txn, id); + } + txn.setComplete(); + } finally { + if (!txn.isComplete()) txn.setComplete(); + db.endTransaction(txn); + } + if (c != null && allDelivered) deliverMessage(m, c, meta); + deliverNextPendingMessage(pending); + } catch(InvalidMessageException e) { + if (LOG.isLoggable(INFO)) + LOG.log(INFO, e.toString(), e); + markMessageInvalid(m, c); + deliverNextPendingMessage(pending); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + } + } + }); + } + private Message parseMessage(MessageId id, byte[] raw) { if (raw.length <= MESSAGE_HEADER_LENGTH) throw new IllegalArgumentException(); @@ -176,19 +290,105 @@ class ValidationManagerImpl implements ValidationManager, Service, @Override public void run() { try { + State newState = null; + Metadata meta = null; Transaction txn = db.startTransaction(false); try { - Metadata meta = result.getMetadata(); + // store dependencies + Collection<MessageId> dependencies = + result.getDependencies(); + if (dependencies != null && dependencies.size() > 0) { + db.addMessageDependencies(txn, m, dependencies); + } + // check if a dependency is invalid + // and if all dependencies have been delivered + Map<MessageId, State> states = + db.getMessageDependencies(txn, m.getId()); + newState = VALID; + for (Entry<MessageId, State> d : states.entrySet()) { + if (d.getValue() == INVALID) { + throw new InvalidMessageException( + "Dependency Invalid"); + } + if (d.getValue() != DELIVERED) { + newState = PENDING; + LOG.info("depend. undelivered, set to PENDING"); + break; + } + } + // save metadata and new message state + meta = result.getMetadata(); db.mergeMessageMetadata(txn, m.getId(), meta); - db.setMessageState(txn, m.getId(), c, VALID); - db.setMessageShared(txn, m, true); + db.setMessageState(txn, m, c, newState); + txn.setComplete(); + } finally { + if (!txn.isComplete()) txn.setComplete(); + db.endTransaction(txn); + } + // deliver message if valid + if (newState == VALID) { + deliverMessage(m, c, meta); + } + } catch (InvalidMessageException e) { + if (LOG.isLoggable(INFO)) + LOG.log(INFO, e.toString(), e); + markMessageInvalid(m, c); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + } + } + }); + } + + private void deliverMessage(final Message m, final ClientId c, + final Metadata meta) { + dbExecutor.execute(new Runnable() { + @Override + public void run() { + try { + Queue<MessageId> pending = new LinkedList<MessageId>(); + Transaction txn = db.startTransaction(false); + try { IncomingMessageHook hook = hooks.get(c); if (hook != null) hook.incomingMessage(txn, m, meta); + + // check if message was deleted by client + if (db.getRawMessage(txn, m.getId()) == null) { + throw new InvalidMessageException( + "Deleted by Client"); + } + + db.setMessageShared(txn, m, true); + db.setMessageState(txn, m, c, DELIVERED); + + // deliver pending dependents + Map<MessageId, State> dependents = + db.getMessageDependents(txn, m.getId()); + for (Entry<MessageId, State> i : dependents + .entrySet()) { + if (i.getValue() != PENDING) continue; + + // check that all dependencies are delivered + Map<MessageId, State> dependencies = + db.getMessageDependencies(txn, i.getKey()); + for (Entry<MessageId, State> j : dependencies + .entrySet()) { + if (j.getValue() != DELIVERED) return; + } + pending.add(i.getKey()); + } txn.setComplete(); } finally { + if (!txn.isComplete()) txn.setComplete(); db.endTransaction(txn); } + deliverNextMessage(pending); + } catch (InvalidMessageException e) { + if (LOG.isLoggable(INFO)) + LOG.log(INFO, e.toString(), e); + markMessageInvalid(m, c); } catch (DbException e) { if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -202,13 +402,56 @@ class ValidationManagerImpl implements ValidationManager, Service, @Override public void run() { try { + Queue<MessageId> invalid = new LinkedList<MessageId>(); Transaction txn = db.startTransaction(false); try { - db.setMessageState(txn, m.getId(), c, INVALID); + Map<MessageId, State> dependents = + db.getMessageDependents(txn, m.getId()); + db.setMessageState(txn, m, c, INVALID); + db.deleteMessage(txn, m.getId()); + db.deleteMessageMetadata(txn, m.getId()); + + // recursively invalidate all messages that depend on m + // TODO check that cycles are properly taken care of + for (Entry<MessageId, State> i : dependents + .entrySet()) { + if (i.getValue() != INVALID) { + invalid.add(i.getKey()); + } + } + txn.setComplete(); + } finally { + db.endTransaction(txn); + } + markNextMessageInvalid(invalid); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + } + } + }); + } + + private void markNextMessageInvalid(final Queue<MessageId> invalid) { + if (invalid.isEmpty()) return; + dbExecutor.execute(new Runnable() { + @Override + public void run() { + try { + Message m = null; + Group g = null; + Transaction txn = db.startTransaction(true); + try { + MessageId id = invalid.poll(); + byte[] raw = db.getRawMessage(txn, id); + m = parseMessage(id, raw); + g = db.getGroup(txn, m.getGroupId()); txn.setComplete(); } finally { db.endTransaction(txn); } + if (g != null) markMessageInvalid(m, g.getClientId()); + markNextMessageInvalid(invalid); } catch (DbException e) { if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); diff --git a/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java b/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java index d55b575c2d9c2d1db66aadbb6ad1326631fb2a83..0520b3b1fffe9b0be5bf1216e8a79487663dd738 100644 --- a/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java +++ b/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java @@ -16,26 +16,37 @@ import org.briarproject.api.sync.Group; import org.briarproject.api.sync.GroupId; import org.briarproject.api.sync.InvalidMessageException; import org.briarproject.api.sync.Message; +import org.briarproject.api.sync.MessageContext; import org.briarproject.api.sync.MessageId; import org.briarproject.api.sync.ValidationManager.IncomingMessageHook; import org.briarproject.api.sync.ValidationManager.MessageValidator; -import org.briarproject.api.sync.MessageContext; +import org.briarproject.api.sync.ValidationManager.State; import org.briarproject.util.ByteUtils; import org.jmock.Expectations; import org.jmock.Mockery; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executor; +import static org.briarproject.api.sync.ValidationManager.State.DELIVERED; import static org.briarproject.api.sync.ValidationManager.State.INVALID; +import static org.briarproject.api.sync.ValidationManager.State.PENDING; +import static org.briarproject.api.sync.ValidationManager.State.UNKNOWN; import static org.briarproject.api.sync.ValidationManager.State.VALID; +import static org.junit.Assert.assertTrue; public class ValidationManagerImplTest extends BriarTestCase { private final ClientId clientId = new ClientId(TestUtils.getRandomId()); private final MessageId messageId = new MessageId(TestUtils.getRandomId()); private final MessageId messageId1 = new MessageId(TestUtils.getRandomId()); + private final MessageId messageId2 = new MessageId(TestUtils.getRandomId()); private final GroupId groupId = new GroupId(TestUtils.getRandomId()); private final byte[] descriptor = new byte[32]; private final Group group = new Group(groupId, clientId, descriptor); @@ -45,14 +56,23 @@ public class ValidationManagerImplTest extends BriarTestCase { raw); private final Message message1 = new Message(messageId1, groupId, timestamp, raw); + private final Message message2 = new Message(messageId2, groupId, timestamp, + raw); private final Metadata metadata = new Metadata(); - final MessageContext validResult = new MessageContext(metadata); + private final MessageContext validResult = new MessageContext(metadata); private final ContactId contactId = new ContactId(234); + private final Collection<MessageId> dependencies = new ArrayList<>(); + private final MessageContext validResultWithDependencies = + new MessageContext(metadata, dependencies); + private final Map<MessageId, State> states = new HashMap<>(); public ValidationManagerImplTest() { // Encode the messages System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH); ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH); + + dependencies.add(messageId1); + states.put(messageId1, INVALID); } @Test @@ -67,8 +87,10 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn = new Transaction(null, false); final Transaction txn1 = new Transaction(null, false); final Transaction txn2 = new Transaction(null, false); + final Transaction txn2b = new Transaction(null, false); final Transaction txn3 = new Transaction(null, false); final Transaction txn4 = new Transaction(null, false); + final Transaction txn5 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -90,12 +112,22 @@ public class ValidationManagerImplTest extends BriarTestCase { // Store the validation result for the first message oneOf(db).startTransaction(false); will(returnValue(txn2)); + oneOf(db).getMessageDependencies(txn2, messageId); oneOf(db).mergeMessageMetadata(txn2, messageId, metadata); - oneOf(db).setMessageState(txn2, messageId, clientId, VALID); - oneOf(db).setMessageShared(txn2, message, true); - // Call the hook for the first message - oneOf(hook).incomingMessage(txn2, message, metadata); + oneOf(db).setMessageState(txn2, message, clientId, VALID); oneOf(db).endTransaction(txn2); + // Async delivery + oneOf(db).startTransaction(false); + will(returnValue(txn2b)); + oneOf(db).setMessageShared(txn2b, message, true); + // Call the hook for the first message + oneOf(hook).incomingMessage(txn2b, message, metadata); + oneOf(db).getRawMessage(txn2b, messageId); + will(returnValue(raw)); + oneOf(db).setMessageState(txn2b, message, clientId, DELIVERED); + oneOf(db).getMessageDependents(txn2b, messageId); + will(returnValue(Collections.emptyMap())); + oneOf(db).endTransaction(txn2b); // Load the second raw message and group oneOf(db).startTransaction(true); will(returnValue(txn3)); @@ -110,8 +142,170 @@ public class ValidationManagerImplTest extends BriarTestCase { // Store the validation result for the second message oneOf(db).startTransaction(false); will(returnValue(txn4)); - oneOf(db).setMessageState(txn4, messageId1, clientId, INVALID); + oneOf(db).setMessageState(txn4, message1, clientId, INVALID); + // Recursively invalidate dependents + oneOf(db).getMessageDependents(txn4, messageId1); + oneOf(db).deleteMessage(txn4, messageId1); + oneOf(db).deleteMessageMetadata(txn4, messageId1); + oneOf(db).endTransaction(txn4); + // Get other messages to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn5)); + oneOf(db).getMessagesToDeliver(txn5, clientId); + oneOf(db).getPendingMessages(txn5, clientId); + oneOf(db).endTransaction(txn5); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.startService(); + + context.assertIsSatisfied(); + } + + @Test + public void testMessagesAreDeliveredAtStartup() throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, true); + final Transaction txn2 = new Transaction(null, true); + final Transaction txn3 = new Transaction(null, false); + final Transaction txn4 = new Transaction(null, true); + final Transaction txn5 = new Transaction(null, false); + + states.put(messageId1, PENDING); + + context.checking(new Expectations() {{ + // Get messages to validate + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getMessagesToValidate(txn, clientId); + oneOf(db).endTransaction(txn); + // Get IDs of messages to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn1)); + oneOf(db).getMessagesToDeliver(txn1, clientId); + will(returnValue(Collections.singletonList(messageId))); + oneOf(db).getPendingMessages(txn1, clientId); + oneOf(db).endTransaction(txn1); + // Get message and its metadata to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn2)); + oneOf(db).getRawMessage(txn2, messageId); + will(returnValue(message.getRaw())); + oneOf(db).getGroup(txn2, message.getGroupId()); + will(returnValue(group)); + oneOf(db).getMessageMetadata(txn2, messageId); + will(returnValue(metadata)); + oneOf(db).endTransaction(txn2); + // Deliver message in a new transaction + oneOf(db).startTransaction(false); + will(returnValue(txn3)); + oneOf(db).setMessageShared(txn3, message, true); + oneOf(db).setMessageState(txn3, message, clientId, DELIVERED); + oneOf(hook).incomingMessage(txn3, message, metadata); + oneOf(db).getRawMessage(txn3, messageId); + will(returnValue(message.getRaw())); + // Try to also deliver pending dependents + oneOf(db).getMessageDependents(txn3, messageId); + will(returnValue(states)); + oneOf(db).getMessageDependencies(txn3, messageId1); + will(returnValue(Collections.singletonMap(messageId2, DELIVERED))); + oneOf(db).endTransaction(txn3); + // Get the dependent to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn4)); + oneOf(db).getRawMessage(txn4, messageId1); + will(returnValue(message1.getRaw())); + oneOf(db).getGroup(txn4, message.getGroupId()); + will(returnValue(group)); + oneOf(db).getMessageMetadata(txn4, messageId1); + will(returnValue(metadata)); oneOf(db).endTransaction(txn4); + // Deliver the dependent in a new transaction + oneOf(db).startTransaction(false); + will(returnValue(txn5)); + oneOf(db).setMessageShared(txn5, message1, true); + oneOf(db).setMessageState(txn5, message1, clientId, DELIVERED); + oneOf(hook).incomingMessage(txn5, message1, metadata); + oneOf(db).getRawMessage(txn5, messageId1); + will(returnValue(message1.getRaw())); + oneOf(db).getMessageDependents(txn5, messageId1); + oneOf(db).endTransaction(txn5); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.startService(); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); + assertTrue(txn5.isComplete()); + } + + @Test + public void testPendingMessagesAreDeliveredAtStartup() throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, true); + final Transaction txn2 = new Transaction(null, true); + final Transaction txn3 = new Transaction(null, false); + + context.checking(new Expectations() {{ + // Get messages to validate + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getMessagesToValidate(txn, clientId); + oneOf(db).endTransaction(txn); + // Get IDs of messages to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn1)); + oneOf(db).getMessagesToDeliver(txn1, clientId); + oneOf(db).getPendingMessages(txn1, clientId); + will(returnValue(Collections.singletonList(messageId))); + oneOf(db).endTransaction(txn1); + // Get message and its metadata to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn2)); + oneOf(db).getRawMessage(txn2, messageId); + will(returnValue(message.getRaw())); + oneOf(db).getGroup(txn2, message.getGroupId()); + will(returnValue(group)); + oneOf(db).getMessageDependencies(txn2, messageId); + will(returnValue(Collections.singletonMap(messageId1, DELIVERED))); + oneOf(db).getMessageMetadata(txn2, messageId); + oneOf(db).endTransaction(txn2); + // Deliver the pending message + oneOf(db).startTransaction(false); + will(returnValue(txn3)); + oneOf(db).setMessageShared(txn3, message, true); + oneOf(db).setMessageState(txn3, message, clientId, DELIVERED); + oneOf(hook).incomingMessage(txn3, message, metadata); + oneOf(db).getRawMessage(txn3, messageId); + will(returnValue(message.getRaw())); + oneOf(db).getMessageDependents(txn3, messageId); + oneOf(db).endTransaction(txn3); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -121,6 +315,11 @@ public class ValidationManagerImplTest extends BriarTestCase { vm.startService(); context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); } @Test @@ -137,6 +336,7 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, true); final Transaction txn3 = new Transaction(null, false); + final Transaction txn4 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -164,8 +364,18 @@ public class ValidationManagerImplTest extends BriarTestCase { // Store the validation result for the second message oneOf(db).startTransaction(false); will(returnValue(txn3)); - oneOf(db).setMessageState(txn3, messageId1, clientId, INVALID); + oneOf(db).setMessageState(txn3, message1, clientId, INVALID); + // recursively invalidate dependents + oneOf(db).getMessageDependents(txn3, messageId1); + oneOf(db).deleteMessage(txn3, messageId1); + oneOf(db).deleteMessageMetadata(txn3, messageId1); oneOf(db).endTransaction(txn3); + // Get other messages to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn4)); + oneOf(db).getMessagesToDeliver(txn4, clientId); + oneOf(db).getPendingMessages(txn4, clientId); + oneOf(db).endTransaction(txn4); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -175,6 +385,12 @@ public class ValidationManagerImplTest extends BriarTestCase { vm.startService(); context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); } @Test @@ -191,6 +407,7 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, true); final Transaction txn3 = new Transaction(null, false); + final Transaction txn4 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -221,8 +438,18 @@ public class ValidationManagerImplTest extends BriarTestCase { // Store the validation result for the second message oneOf(db).startTransaction(false); will(returnValue(txn3)); - oneOf(db).setMessageState(txn3, messageId1, clientId, INVALID); + oneOf(db).setMessageState(txn3, message1, clientId, INVALID); + // recursively invalidate dependents + oneOf(db).getMessageDependents(txn3, messageId1); + oneOf(db).deleteMessage(txn3, messageId1); + oneOf(db).deleteMessageMetadata(txn3, messageId1); oneOf(db).endTransaction(txn3); + // Get other messages to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn4)); + oneOf(db).getMessagesToDeliver(txn4, clientId); + oneOf(db).getPendingMessages(txn4, clientId); + oneOf(db).endTransaction(txn4); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -232,6 +459,12 @@ public class ValidationManagerImplTest extends BriarTestCase { vm.startService(); context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); } @Test @@ -245,6 +478,7 @@ public class ValidationManagerImplTest extends BriarTestCase { context.mock(IncomingMessageHook.class); final Transaction txn = new Transaction(null, true); final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, false); context.checking(new Expectations() {{ // Load the group oneOf(db).startTransaction(true); @@ -258,12 +492,21 @@ public class ValidationManagerImplTest extends BriarTestCase { // Store the validation result oneOf(db).startTransaction(false); will(returnValue(txn1)); + oneOf(db).getMessageDependencies(txn1, messageId); oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); - oneOf(db).setMessageState(txn1, messageId, clientId, VALID); - oneOf(db).setMessageShared(txn1, message, true); - // Call the hook - oneOf(hook).incomingMessage(txn1, message, metadata); + oneOf(db).setMessageState(txn1, message, clientId, VALID); oneOf(db).endTransaction(txn1); + // async delivery + oneOf(db).startTransaction(false); + will(returnValue(txn2)); + oneOf(db).setMessageShared(txn2, message, true); + // Call the hook + oneOf(hook).incomingMessage(txn2, message, metadata); + oneOf(db).getRawMessage(txn2, messageId); + will(returnValue(raw)); + oneOf(db).setMessageState(txn2, message, clientId, DELIVERED); + oneOf(db).getMessageDependents(txn2, messageId); + oneOf(db).endTransaction(txn2); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -273,6 +516,10 @@ public class ValidationManagerImplTest extends BriarTestCase { vm.eventOccurred(new MessageAddedEvent(message, contactId)); context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); } @Test @@ -293,4 +540,402 @@ public class ValidationManagerImplTest extends BriarTestCase { context.assertIsSatisfied(); } + + @Test + public void testMessagesWithNonDeliveredDependenciesArePending() + throws Exception { + + states.put(messageId1, UNKNOWN); + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResultWithDependencies)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).addMessageDependencies(txn1, message, + validResultWithDependencies.getDependencies()); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(states)); + oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); + oneOf(db).setMessageState(txn1, message, clientId, PENDING); + oneOf(db).endTransaction(txn1); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + } + + @Test + public void testMessagesWithDeliveredDependenciesGetDelivered() + throws Exception { + + states.put(messageId1, DELIVERED); + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResultWithDependencies)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).addMessageDependencies(txn1, message, + validResultWithDependencies.getDependencies()); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(states)); + oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); + oneOf(db).setMessageState(txn1, message, clientId, VALID); + oneOf(db).endTransaction(txn1); + // async delivery + oneOf(db).startTransaction(false); + will(returnValue(txn2)); + oneOf(db).setMessageShared(txn2, message, true); + // Call the hook + oneOf(hook).incomingMessage(txn2, message, metadata); + oneOf(db).getRawMessage(txn2, messageId); + will(returnValue(raw)); + oneOf(db).setMessageState(txn2, message, clientId, DELIVERED); + oneOf(db).getMessageDependents(txn2, messageId); + oneOf(db).endTransaction(txn2); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + } + + @Test + public void testMessagesWithInvalidDependenciesAreInvalid() + throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, false); + final Transaction txn3 = new Transaction(null, true); + final Transaction txn4 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResultWithDependencies)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).addMessageDependencies(txn1, message, + validResultWithDependencies.getDependencies()); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(states)); + oneOf(db).endTransaction(txn1); + // Invalidate message in a new transaction + oneOf(db).startTransaction(false); + will(returnValue(txn2)); + oneOf(db).getMessageDependents(txn2, messageId); + will(returnValue(Collections.singletonMap(messageId2, UNKNOWN))); + oneOf(db).setMessageState(txn2, message, clientId, INVALID); + oneOf(db).deleteMessage(txn2, messageId); + oneOf(db).deleteMessageMetadata(txn2, messageId); + oneOf(db).endTransaction(txn2); + // Get message to invalidate in a new transaction + oneOf(db).startTransaction(true); + will(returnValue(txn3)); + oneOf(db).getRawMessage(txn3, messageId2); + will(returnValue(message2.getRaw())); + oneOf(db).getGroup(txn3, message2.getGroupId()); + will(returnValue(group)); + oneOf(db).endTransaction(txn3); + // Invalidate dependent message in a new transaction + oneOf(db).startTransaction(false); + will(returnValue(txn4)); + oneOf(db).getMessageDependents(txn4, messageId2); + will(returnValue(Collections.emptyMap())); + oneOf(db).setMessageState(txn4, message2, clientId, INVALID); + oneOf(db).deleteMessage(txn4, messageId2); + oneOf(db).deleteMessageMetadata(txn4, messageId2); + oneOf(db).endTransaction(txn4); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); + } + + @Test + public void testPendingDependentsGetDelivered() throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, false); + final Transaction txn3 = new Transaction(null, true); + final Transaction txn4 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResult)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(Collections.emptyMap())); + oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); + oneOf(db).setMessageState(txn1, message, clientId, VALID); + oneOf(db).endTransaction(txn1); + // Deliver first message + oneOf(db).startTransaction(false); + will(returnValue(txn2)); + oneOf(db).setMessageShared(txn2, message, true); + oneOf(hook).incomingMessage(txn2, message, metadata); + oneOf(db).getRawMessage(txn2, messageId); + will(returnValue(raw)); + oneOf(db).setMessageState(txn2, message, clientId, DELIVERED); + oneOf(db).getMessageDependents(txn2, messageId); + will(returnValue(Collections.singletonMap(messageId1, PENDING))); + oneOf(db).getMessageDependencies(txn2, messageId1); + will(returnValue(Collections.singletonMap(messageId2, DELIVERED))); + oneOf(db).endTransaction(txn2); + // Also get the pending message for delivery + oneOf(db).startTransaction(true); + will(returnValue(txn3)); + oneOf(db).getRawMessage(txn3, messageId1); + will(returnValue(message1.getRaw())); + oneOf(db).getGroup(txn3, message1.getGroupId()); + will(returnValue(group)); + oneOf(db).getMessageMetadata(txn3, messageId1); + will(returnValue(metadata)); + oneOf(db).endTransaction(txn3); + // Deliver the pending message + oneOf(db).startTransaction(false); + will(returnValue(txn4)); + oneOf(db).setMessageShared(txn4, message1, true); + oneOf(hook).incomingMessage(txn4, message1, metadata); + oneOf(db).getRawMessage(txn4, messageId1); + will(returnValue(raw)); + oneOf(db).setMessageState(txn4, message1, clientId, DELIVERED); + oneOf(db).getMessageDependents(txn4, messageId1); + will(returnValue(Collections.emptyMap())); + oneOf(db).endTransaction(txn4); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); + } + + @Test + public void testOnlyReadyPendingDependentsGetDelivered() throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResult)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(Collections.emptyMap())); + oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); + oneOf(db).setMessageState(txn1, message, clientId, VALID); + oneOf(db).endTransaction(txn1); + // Deliver first message + oneOf(db).startTransaction(false); + will(returnValue(txn2)); + oneOf(db).setMessageShared(txn2, message, true); + oneOf(hook).incomingMessage(txn2, message, metadata); + oneOf(db).getRawMessage(txn2, messageId); + will(returnValue(raw)); + oneOf(db).setMessageState(txn2, message, clientId, DELIVERED); + oneOf(db).getMessageDependents(txn2, messageId); + will(returnValue(Collections.singletonMap(messageId1, PENDING))); + oneOf(db).getMessageDependencies(txn2, messageId1); + will(returnValue(Collections.singletonMap(messageId2, VALID))); + oneOf(db).endTransaction(txn2); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + } + + @Test + public void testMessageDependencyCycle() throws Exception { + states.put(messageId1, UNKNOWN); + final MessageContext cycleContext = new MessageContext(metadata, + Collections.singletonList(messageId)); + + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, true); + final Transaction txn3 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResultWithDependencies)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).addMessageDependencies(txn1, message, + validResultWithDependencies.getDependencies()); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(states)); + oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); + oneOf(db).setMessageState(txn1, message, clientId, PENDING); + oneOf(db).endTransaction(txn1); + // Second message is coming in + oneOf(db).startTransaction(true); + will(returnValue(txn2)); + oneOf(db).getGroup(txn2, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn2); + // Validate the message: valid + oneOf(validator).validateMessage(message1, group); + will(returnValue(cycleContext)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn3)); + oneOf(db).addMessageDependencies(txn3, message1, + cycleContext.getDependencies()); + oneOf(db).getMessageDependencies(txn3, messageId1); + will(returnValue(Collections.singletonMap(messageId, PENDING))); + oneOf(db).mergeMessageMetadata(txn3, messageId1, metadata); + oneOf(db).setMessageState(txn3, message1, clientId, PENDING); + oneOf(db).endTransaction(txn3); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + vm.eventOccurred(new MessageAddedEvent(message1, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + } + }