From d0581724293c85bc8fed6a61d9d1e01c9d7c4191 Mon Sep 17 00:00:00 2001 From: Torsten Grote <t@grobox.de> Date: Wed, 31 Aug 2016 13:15:59 -0300 Subject: [PATCH] When a message is shared, share its transitive dependencies Like other recursive operations on the dependency graph, this is not done in a single transaction to prevent an attacker from creating arbitrary large transactions. So at startup, the `ValidationManager` finds and resumes any unfinished operations, by looking for shared messages with unshared dependencies. --- .../api/clients/ClientHelper.java | 6 - .../api/db/DatabaseComponent.java | 14 +- .../api/sync/ValidationManager.java | 3 +- .../briarproject/blogs/BlogManagerImpl.java | 19 +- .../clients/BdfIncomingMessageHook.java | 10 +- .../clients/ClientHelperImpl.java | 6 - .../clients/MessageQueueManagerImpl.java | 7 +- .../src/org/briarproject/db/Database.java | 14 +- .../db/DatabaseComponentImpl.java | 81 +++++++- .../src/org/briarproject/db/JdbcDatabase.java | 38 +++- .../briarproject/forum/ForumManagerImpl.java | 7 +- .../introduction/IntroductionManagerImpl.java | 7 +- .../messaging/MessagingManagerImpl.java | 5 +- .../sharing/SharingManagerImpl.java | 4 +- .../sync/ValidationManagerImpl.java | 113 +++++++++++- .../blogs/BlogManagerImplTest.java | 1 - .../db/DatabaseComponentImplTest.java | 2 +- .../org/briarproject/db/H2DatabaseTest.java | 46 ++++- .../sync/ValidationManagerImplTest.java | 173 ++++++++++++++++++ 19 files changed, 480 insertions(+), 76 deletions(-) diff --git a/briar-api/src/org/briarproject/api/clients/ClientHelper.java b/briar-api/src/org/briarproject/api/clients/ClientHelper.java index c2d3877597..5aeffaa21e 100644 --- a/briar-api/src/org/briarproject/api/clients/ClientHelper.java +++ b/briar-api/src/org/briarproject/api/clients/ClientHelper.java @@ -68,12 +68,6 @@ public interface ClientHelper { void mergeMessageMetadata(Transaction txn, MessageId m, BdfDictionary metadata) throws DbException, FormatException; - /** - * Marks the given message as shared or unshared with other contacts. - */ - void setMessageShared(Transaction txn, MessageId m, boolean shared) - throws DbException; - byte[] toByteArray(BdfDictionary dictionary) throws FormatException; byte[] toByteArray(BdfList list) throws FormatException; diff --git a/briar-api/src/org/briarproject/api/db/DatabaseComponent.java b/briar-api/src/org/briarproject/api/db/DatabaseComponent.java index a30ffabaa8..cd15469512 100644 --- a/briar-api/src/org/briarproject/api/db/DatabaseComponent.java +++ b/briar-api/src/org/briarproject/api/db/DatabaseComponent.java @@ -258,6 +258,15 @@ public interface DatabaseComponent { Collection<MessageId> getPendingMessages(Transaction txn, ClientId c) throws DbException; + /** + * Returns the IDs of any messages from the given client + * that have a shared dependent, but are still not shared themselves. + * <p/> + * Read-only. + */ + Collection<MessageId> getMessagesToShare(Transaction txn, + ClientId c) throws DbException; + /** * Returns the message with the given ID, in serialised form, or null if * the message has been deleted. @@ -456,10 +465,9 @@ public interface DatabaseComponent { throws DbException; /** - * Marks the given message as shared or unshared. + * Marks the given message as shared. */ - void setMessageShared(Transaction txn, MessageId m, boolean shared) - throws DbException; + void setMessageShared(Transaction txn, MessageId m) throws DbException; /** * Sets the validation and delivery state of the given message. diff --git a/briar-api/src/org/briarproject/api/sync/ValidationManager.java b/briar-api/src/org/briarproject/api/sync/ValidationManager.java index ce6a11b234..8fe7615221 100644 --- a/briar-api/src/org/briarproject/api/sync/ValidationManager.java +++ b/briar-api/src/org/briarproject/api/sync/ValidationManager.java @@ -55,8 +55,9 @@ public interface ValidationManager { /** * Called once for each incoming message that passes validation. + * @return whether or not this message should be shared */ - void incomingMessage(Transaction txn, Message m, Metadata meta) + boolean incomingMessage(Transaction txn, Message m, Metadata meta) throws DbException; } } diff --git a/briar-core/src/org/briarproject/blogs/BlogManagerImpl.java b/briar-core/src/org/briarproject/blogs/BlogManagerImpl.java index 71a5721cf5..48b4452d1b 100644 --- a/briar-core/src/org/briarproject/blogs/BlogManagerImpl.java +++ b/briar-core/src/org/briarproject/blogs/BlogManagerImpl.java @@ -176,7 +176,7 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager, } @Override - protected void incomingMessage(Transaction txn, Message m, BdfList list, + protected boolean incomingMessage(Transaction txn, Message m, BdfList list, BdfDictionary meta) throws DbException, FormatException { GroupId groupId = m.getGroupId(); @@ -196,13 +196,14 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager, throw new FormatException(); } } - // share dependencies recursively - TODO remove with #598 - share(txn, h); // broadcast event about new post or comment BlogPostAddedEvent event = new BlogPostAddedEvent(groupId, h, false); txn.attach(event); + + // shares message and its dependencies + return true; } else if (type == WRAPPED_COMMENT) { // Check that the original message ID in the dependency's metadata // matches the original parent ID of the wrapped comment @@ -216,6 +217,8 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager, throw new FormatException(); } } + // don't share message until parent arrives + return false; } @Override @@ -672,14 +675,4 @@ class BlogManagerImpl extends BdfIncomingMessageHook implements BlogManager, Long longType = d.getLong(KEY_TYPE); return MessageType.valueOf(longType.intValue()); } - - // TODO remove when implementing #589 - @Deprecated - private void share(Transaction txn, BlogPostHeader h) throws DbException { - clientHelper.setMessageShared(txn, h.getId(), true); - if (h instanceof BlogCommentHeader) { - BlogPostHeader h2 = ((BlogCommentHeader) h).getParent(); - share(txn, h2); - } - } } diff --git a/briar-core/src/org/briarproject/clients/BdfIncomingMessageHook.java b/briar-core/src/org/briarproject/clients/BdfIncomingMessageHook.java index f8211623d2..3ca19d9bd5 100644 --- a/briar-core/src/org/briarproject/clients/BdfIncomingMessageHook.java +++ b/briar-core/src/org/briarproject/clients/BdfIncomingMessageHook.java @@ -28,14 +28,14 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook, this.metadataParser = metadataParser; } - protected abstract void incomingMessage(Transaction txn, Message m, + protected abstract boolean incomingMessage(Transaction txn, Message m, BdfList body, BdfDictionary meta) throws DbException, FormatException; @Override - public void incomingMessage(Transaction txn, Message m, Metadata meta) + public boolean incomingMessage(Transaction txn, Message m, Metadata meta) throws DbException { - incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH); + return incomingMessage(txn, m, meta, MESSAGE_HEADER_LENGTH); } @Override @@ -44,14 +44,14 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook, incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH); } - private void incomingMessage(Transaction txn, Message m, Metadata meta, + private boolean incomingMessage(Transaction txn, Message m, Metadata meta, int headerLength) throws DbException { try { byte[] raw = m.getRaw(); BdfList body = clientHelper.toList(raw, headerLength, raw.length - headerLength); BdfDictionary metaDictionary = metadataParser.parse(meta); - incomingMessage(txn, m, body, metaDictionary); + return incomingMessage(txn, m, body, metaDictionary); } catch (FormatException e) { throw new DbException(e); } diff --git a/briar-core/src/org/briarproject/clients/ClientHelperImpl.java b/briar-core/src/org/briarproject/clients/ClientHelperImpl.java index 890dc5146d..d7a983b0ae 100644 --- a/briar-core/src/org/briarproject/clients/ClientHelperImpl.java +++ b/briar-core/src/org/briarproject/clients/ClientHelperImpl.java @@ -245,12 +245,6 @@ class ClientHelperImpl implements ClientHelper { db.mergeMessageMetadata(txn, m, metadataEncoder.encode(metadata)); } - @Override - public void setMessageShared(Transaction txn, MessageId m, boolean shared) - throws DbException { - db.setMessageShared(txn, m, shared); - } - @Override public byte[] toByteArray(BdfDictionary dictionary) throws FormatException { ByteArrayOutputStream out = new ByteArrayOutputStream(); diff --git a/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java b/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java index 9869707d58..ad1fffe3b5 100644 --- a/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java +++ b/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java @@ -188,12 +188,12 @@ class MessageQueueManagerImpl implements MessageQueueManager { private final IncomingQueueMessageHook delegate; - DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) { + private DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) { this.delegate = delegate; } @Override - public void incomingMessage(Transaction txn, Message m, Metadata meta) + public boolean incomingMessage(Transaction txn, Message m, Metadata meta) throws DbException { long queuePosition = ByteUtils.readUint64(m.getRaw(), MESSAGE_HEADER_LENGTH); @@ -239,6 +239,9 @@ class MessageQueueManagerImpl implements MessageQueueManager { delegate.incomingMessage(txn, q, meta); } } + // message queues are only useful for groups with two members + // so messages don't need to be shared + return false; } } } diff --git a/briar-core/src/org/briarproject/db/Database.java b/briar-core/src/org/briarproject/db/Database.java index 5a07c0f06d..fe16ac391c 100644 --- a/briar-core/src/org/briarproject/db/Database.java +++ b/briar-core/src/org/briarproject/db/Database.java @@ -434,6 +434,15 @@ interface Database<T> { Collection<MessageId> getPendingMessages(T txn, ClientId c) throws DbException; + /** + * Returns the IDs of any messages from the given client + * that have a shared dependent, but are still not shared themselves. + * <p/> + * Read-only. + */ + Collection<MessageId> getMessagesToShare(T txn, ClientId c) + throws DbException; + /** * Returns the message with the given ID, in serialised form, or null if * the message has been deleted. @@ -599,10 +608,9 @@ interface Database<T> { throws DbException; /** - * Marks the given message as shared or unshared. + * Marks the given message as shared. */ - void setMessageShared(T txn, MessageId m, boolean shared) - throws DbException; + void setMessageShared(T txn, MessageId m) throws DbException; /** * Sets the validation and delivery state of the given message. diff --git a/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java b/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java index 681132f31c..3265db024b 100644 --- a/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java +++ b/briar-core/src/org/briarproject/db/DatabaseComponentImpl.java @@ -94,6 +94,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { this.shutdown = shutdown; } + @Override public boolean open() throws DbException { Runnable shutdownHook = new Runnable() { public void run() { @@ -110,12 +111,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return reopened; } + @Override public void close() throws DbException { if (closed.getAndSet(true)) return; shutdown.removeShutdownHook(shutdownHandle); db.close(); } + @Override public Transaction startTransaction(boolean readOnly) throws DbException { // Don't allow reentrant locking if (lock.getReadHoldCount() > 0) throw new IllegalStateException(); @@ -135,6 +138,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void endTransaction(Transaction transaction) throws DbException { try { T txn = txnClass.cast(transaction.unbox()); @@ -153,6 +157,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return txnClass.cast(transaction.unbox()); } + @Override public ContactId addContact(Transaction transaction, Author remote, AuthorId local, boolean verified, boolean active) throws DbException { @@ -170,6 +175,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return c; } + @Override public void addGroup(Transaction transaction, Group g) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); @@ -179,6 +185,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void addLocalAuthor(Transaction transaction, LocalAuthor a) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -189,6 +196,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void addLocalMessage(Transaction transaction, Message m, Metadata meta, boolean shared) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -215,6 +223,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void addTransport(Transaction transaction, TransportId t, int maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -223,6 +232,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.addTransport(txn, t, maxLatency); } + @Override public void addTransportKeys(Transaction transaction, ContactId c, TransportKeys k) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -234,6 +244,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.addTransportKeys(txn, c, k); } + @Override public boolean containsContact(Transaction transaction, AuthorId remote, AuthorId local) throws DbException { T txn = unbox(transaction); @@ -242,6 +253,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.containsContact(txn, remote, local); } + @Override public boolean containsGroup(Transaction transaction, GroupId g) throws DbException { T txn = unbox(transaction); @@ -255,6 +267,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.containsLocalAuthor(txn, local); } + @Override public void deleteMessage(Transaction transaction, MessageId m) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -264,6 +277,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.deleteMessage(txn, m); } + @Override public void deleteMessageMetadata(Transaction transaction, MessageId m) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -274,6 +288,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } @Nullable + @Override public Ack generateAck(Transaction transaction, ContactId c, int maxMessages) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -287,6 +302,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } @Nullable + @Override public Collection<byte[]> generateBatch(Transaction transaction, ContactId c, int maxLength, int maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -306,6 +322,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } @Nullable + @Override public Offer generateOffer(Transaction transaction, ContactId c, int maxMessages, int maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -319,6 +336,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } @Nullable + @Override public Request generateRequest(Transaction transaction, ContactId c, int maxMessages) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -333,6 +351,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } @Nullable + @Override public Collection<byte[]> generateRequestedBatch(Transaction transaction, ContactId c, int maxLength, int maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -352,6 +371,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return Collections.unmodifiableList(messages); } + @Override public Contact getContact(Transaction transaction, ContactId c) throws DbException { T txn = unbox(transaction); @@ -360,18 +380,21 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getContact(txn, c); } + @Override public Collection<Contact> getContacts(Transaction transaction) throws DbException { T txn = unbox(transaction); return db.getContacts(txn); } + @Override public Collection<Contact> getContactsByAuthorId(Transaction transaction, AuthorId remote) throws DbException { T txn = unbox(transaction); return db.getContactsByAuthorId(txn, remote); } + @Override public Collection<ContactId> getContacts(Transaction transaction, AuthorId a) throws DbException { T txn = unbox(transaction); @@ -380,11 +403,13 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getContacts(txn, a); } + @Override public DeviceId getDeviceId(Transaction transaction) throws DbException { T txn = unbox(transaction); return db.getDeviceId(txn); } + @Override public Group getGroup(Transaction transaction, GroupId g) throws DbException { T txn = unbox(transaction); @@ -393,6 +418,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getGroup(txn, g); } + @Override public Metadata getGroupMetadata(Transaction transaction, GroupId g) throws DbException { T txn = unbox(transaction); @@ -401,12 +427,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getGroupMetadata(txn, g); } + @Override public Collection<Group> getGroups(Transaction transaction, ClientId c) throws DbException { T txn = unbox(transaction); return db.getGroups(txn, c); } + @Override public LocalAuthor getLocalAuthor(Transaction transaction, AuthorId a) throws DbException { T txn = unbox(transaction); @@ -415,25 +443,36 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getLocalAuthor(txn, a); } + @Override public Collection<LocalAuthor> getLocalAuthors(Transaction transaction) throws DbException { T txn = unbox(transaction); return db.getLocalAuthors(txn); } + @Override public Collection<MessageId> getMessagesToValidate(Transaction transaction, ClientId c) throws DbException { T txn = unbox(transaction); return db.getMessagesToValidate(txn, c); } + @Override public Collection<MessageId> getPendingMessages(Transaction transaction, ClientId c) throws DbException { T txn = unbox(transaction); return db.getPendingMessages(txn, c); } + @Override + public Collection<MessageId> getMessagesToShare( + Transaction transaction, ClientId c) throws DbException { + T txn = unbox(transaction); + return db.getMessagesToShare(txn, c); + } + @Nullable + @Override public byte[] getRawMessage(Transaction transaction, MessageId m) throws DbException { T txn = unbox(transaction); @@ -442,6 +481,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getRawMessage(txn, m); } + @Override public Map<MessageId, Metadata> getMessageMetadata(Transaction transaction, GroupId g) throws DbException { T txn = unbox(transaction); @@ -450,6 +490,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageMetadata(txn, g); } + @Override public Map<MessageId, Metadata> getMessageMetadata(Transaction transaction, GroupId g, Metadata query) throws DbException { T txn = unbox(transaction); @@ -458,6 +499,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageMetadata(txn, g, query); } + @Override public Metadata getMessageMetadata(Transaction transaction, MessageId m) throws DbException { T txn = unbox(transaction); @@ -466,6 +508,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageMetadata(txn, m); } + @Override public Metadata getMessageMetadataForValidator(Transaction transaction, MessageId m) throws DbException { @@ -475,6 +518,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageMetadataForValidator(txn, m); } + @Override public State getMessageState(Transaction transaction, MessageId m) throws DbException { T txn = unbox(transaction); @@ -483,6 +527,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageState(txn, m); } + @Override public Collection<MessageStatus> getMessageStatus(Transaction transaction, ContactId c, GroupId g) throws DbException { T txn = unbox(transaction); @@ -493,6 +538,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageStatus(txn, c, g); } + @Override public MessageStatus getMessageStatus(Transaction transaction, ContactId c, MessageId m) throws DbException { T txn = unbox(transaction); @@ -503,6 +549,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageStatus(txn, c, m); } + @Override public Map<MessageId, State> getMessageDependencies(Transaction transaction, MessageId m) throws DbException { T txn = unbox(transaction); @@ -511,6 +558,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageDependencies(txn, m); } + @Override public Map<MessageId, State> getMessageDependents(Transaction transaction, MessageId m) throws DbException { T txn = unbox(transaction); @@ -519,12 +567,14 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getMessageDependents(txn, m); } + @Override public Settings getSettings(Transaction transaction, String namespace) throws DbException { T txn = unbox(transaction); return db.getSettings(txn, namespace); } + @Override public Map<ContactId, TransportKeys> getTransportKeys( Transaction transaction, TransportId t) throws DbException { T txn = unbox(transaction); @@ -533,6 +583,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.getTransportKeys(txn, t); } + @Override public void incrementStreamCounter(Transaction transaction, ContactId c, TransportId t, long rotationPeriod) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -544,6 +595,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.incrementStreamCounter(txn, c, t, rotationPeriod); } + @Override public boolean isVisibleToContact(Transaction transaction, ContactId c, GroupId g) throws DbException { T txn = unbox(transaction); @@ -554,6 +606,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { return db.containsVisibleGroup(txn, c, g); } + @Override public void mergeGroupMetadata(Transaction transaction, GroupId g, Metadata meta) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -563,6 +616,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.mergeGroupMetadata(txn, g, meta); } + @Override public void mergeMessageMetadata(Transaction transaction, MessageId m, Metadata meta) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -572,6 +626,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.mergeMessageMetadata(txn, m, meta); } + @Override public void mergeSettings(Transaction transaction, Settings s, String namespace) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -586,6 +641,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void receiveAck(Transaction transaction, ContactId c, Ack a) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -602,6 +658,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new MessagesAckedEvent(c, acked)); } + @Override public void receiveMessage(Transaction transaction, ContactId c, Message m) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -620,6 +677,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void receiveOffer(Transaction transaction, ContactId c, Offer o) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -643,6 +701,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { if (request) transaction.attach(new MessageToRequestEvent(c)); } + @Override public void receiveRequest(Transaction transaction, ContactId c, Request r) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -660,6 +719,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { if (requested) transaction.attach(new MessageRequestedEvent(c)); } + @Override public void removeContact(Transaction transaction, ContactId c) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -670,6 +730,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new ContactRemovedEvent(c)); } + @Override public void removeGroup(Transaction transaction, Group g) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -683,6 +744,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new GroupVisibilityUpdatedEvent(affected)); } + @Override public void removeLocalAuthor(Transaction transaction, AuthorId a) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -693,6 +755,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new LocalAuthorRemovedEvent(a)); } + @Override public void removeTransport(Transaction transaction, TransportId t) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -702,6 +765,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.removeTransport(txn, t); } + @Override public void setContactVerified(Transaction transaction, ContactId c) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -712,6 +776,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new ContactVerifiedEvent(c)); } + @Override public void setContactActive(Transaction transaction, ContactId c, boolean active) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -722,16 +787,20 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new ContactStatusChangedEvent(c, active)); } - public void setMessageShared(Transaction transaction, MessageId m, - boolean shared) throws DbException { + @Override + public void setMessageShared(Transaction transaction, MessageId m) + throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsMessage(txn, m)) throw new NoSuchMessageException(); - db.setMessageShared(txn, m, shared); - if (shared) transaction.attach(new MessageSharedEvent(m)); + if (db.getMessageState(txn, m) != DELIVERED) + throw new IllegalArgumentException("Shared undelivered message"); + db.setMessageShared(txn, m); + transaction.attach(new MessageSharedEvent(m)); } + @Override public void setMessageState(Transaction transaction, MessageId m, State state) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -742,6 +811,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { transaction.attach(new MessageStateChangedEvent(m, false, state)); } + @Override public void addMessageDependencies(Transaction transaction, Message dependent, Collection<MessageId> dependencies) throws DbException { @@ -755,6 +825,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void setReorderingWindow(Transaction transaction, ContactId c, TransportId t, long rotationPeriod, long base, byte[] bitmap) throws DbException { @@ -767,6 +838,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { db.setReorderingWindow(txn, c, t, rotationPeriod, base, bitmap); } + @Override public void setVisibleToContact(Transaction transaction, ContactId c, GroupId g, boolean visible) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); @@ -793,6 +865,7 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { } } + @Override public void updateTransportKeys(Transaction transaction, Map<ContactId, TransportKeys> keys) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); diff --git a/briar-core/src/org/briarproject/db/JdbcDatabase.java b/briar-core/src/org/briarproject/db/JdbcDatabase.java index 9015a3e473..913add5019 100644 --- a/briar-core/src/org/briarproject/db/JdbcDatabase.java +++ b/briar-core/src/org/briarproject/db/JdbcDatabase.java @@ -1692,6 +1692,35 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public Collection<MessageId> getMessagesToShare( + Connection txn, ClientId c) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT m.messageId FROM messages AS m" + + " JOIN messageDependencies AS d" + + " ON m.messageId = d.dependencyId" + + " JOIN messages AS m1" + + " ON d.messageId = m1.messageId" + + " JOIN groups AS g" + + " ON m.groupId = g.groupId" + + " WHERE m.shared = FALSE AND m1.shared = TRUE" + + " AND g.clientId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, c.getBytes()); + rs = ps.executeQuery(); + List<MessageId> ids = new ArrayList<MessageId>(); + while (rs.next()) ids.add(new MessageId(rs.getBytes(1))); + rs.close(); + ps.close(); + return Collections.unmodifiableList(ids); + } catch (SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + @Nullable public byte[] getRawMessage(Connection txn, MessageId m) throws DbException { @@ -2321,14 +2350,13 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setMessageShared(Connection txn, MessageId m, boolean shared) - throws DbException { + public void setMessageShared(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; try { - String sql = "UPDATE messages SET shared = ? WHERE messageId = ?"; + String sql = "UPDATE messages SET shared = TRUE" + + " WHERE messageId = ?"; ps = txn.prepareStatement(sql); - ps.setBoolean(1, shared); - ps.setBytes(2, m.getBytes()); + ps.setBytes(1, m.getBytes()); int affected = ps.executeUpdate(); if (affected < 0 || affected > 1) throw new DbStateException(); ps.close(); diff --git a/briar-core/src/org/briarproject/forum/ForumManagerImpl.java b/briar-core/src/org/briarproject/forum/ForumManagerImpl.java index 6289e1ec36..d4697b7598 100644 --- a/briar-core/src/org/briarproject/forum/ForumManagerImpl.java +++ b/briar-core/src/org/briarproject/forum/ForumManagerImpl.java @@ -73,15 +73,16 @@ class ForumManagerImpl extends BdfIncomingMessageHook implements ForumManager { } @Override - protected void incomingMessage(Transaction txn, Message m, BdfList body, + protected boolean incomingMessage(Transaction txn, Message m, BdfList body, BdfDictionary meta) throws DbException, FormatException { - clientHelper.setMessageShared(txn, m.getId(), true); - ForumPostHeader post = getForumPostHeader(txn, m.getId(), meta); ForumPostReceivedEvent event = new ForumPostReceivedEvent(post, m.getGroupId()); txn.attach(event); + + // share message + return true; } @Override diff --git a/briar-core/src/org/briarproject/introduction/IntroductionManagerImpl.java b/briar-core/src/org/briarproject/introduction/IntroductionManagerImpl.java index 623a1ed375..b07108ed84 100644 --- a/briar-core/src/org/briarproject/introduction/IntroductionManagerImpl.java +++ b/briar-core/src/org/briarproject/introduction/IntroductionManagerImpl.java @@ -207,7 +207,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook * in the introduction protocol and which engine we need to start. */ @Override - protected void incomingMessage(Transaction txn, Message m, BdfList body, + protected boolean incomingMessage(Transaction txn, Message m, BdfList body, BdfDictionary message) throws DbException { // Get message data and type @@ -233,7 +233,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook LOG.log(WARNING, e.toString(), e); } deleteMessage(txn, m.getId()); - return; + return false; } try { introduceeManager.incomingMessage(txn, state, message); @@ -254,7 +254,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook } catch (FormatException e) { LOG.warning("Could not find state for message, deleting..."); deleteMessage(txn, m.getId()); - return; + return false; } long role = state.getLong(ROLE, -1L); @@ -285,6 +285,7 @@ class IntroductionManagerImpl extends BdfIncomingMessageHook LOG.warning("Unknown message type '" + type + "', deleting..."); } } + return false; } @Override diff --git a/briar-core/src/org/briarproject/messaging/MessagingManagerImpl.java b/briar-core/src/org/briarproject/messaging/MessagingManagerImpl.java index f66e9d7e7a..959c64ad43 100644 --- a/briar-core/src/org/briarproject/messaging/MessagingManagerImpl.java +++ b/briar-core/src/org/briarproject/messaging/MessagingManagerImpl.java @@ -93,7 +93,7 @@ class MessagingManagerImpl extends BdfIncomingMessageHook } @Override - protected void incomingMessage(Transaction txn, Message m, BdfList body, + protected boolean incomingMessage(Transaction txn, Message m, BdfList body, BdfDictionary meta) throws DbException, FormatException { GroupId groupId = m.getGroupId(); @@ -106,6 +106,9 @@ class MessagingManagerImpl extends BdfIncomingMessageHook PrivateMessageReceivedEvent event = new PrivateMessageReceivedEvent( header, groupId); txn.attach(event); + + // don't share message + return false; } @Override diff --git a/briar-core/src/org/briarproject/sharing/SharingManagerImpl.java b/briar-core/src/org/briarproject/sharing/SharingManagerImpl.java index 8f2d4925a8..55886301c4 100644 --- a/briar-core/src/org/briarproject/sharing/SharingManagerImpl.java +++ b/briar-core/src/org/briarproject/sharing/SharingManagerImpl.java @@ -192,7 +192,7 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS } @Override - protected void incomingMessage(Transaction txn, Message m, BdfList body, + protected boolean incomingMessage(Transaction txn, Message m, BdfList body, BdfDictionary d) throws DbException, FormatException { BaseMessage msg = BaseMessage.from(getIFactory(), m.getGroupId(), d); @@ -263,6 +263,8 @@ abstract class SharingManagerImpl<S extends Shareable, I extends Invitation, IS // message has passed validator, so that should never happen throw new RuntimeException("Illegal Sharing Message"); } + // don't share message as other party already has it + return false; } @Override diff --git a/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java b/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java index 2d4e716a62..1bedf751f3 100644 --- a/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java +++ b/briar-core/src/org/briarproject/sync/ValidationManagerImpl.java @@ -72,6 +72,7 @@ class ValidationManagerImpl implements ValidationManager, Service, for (ClientId c : validators.keySet()) { validateOutstandingMessagesAsync(c); deliverOutstandingMessagesAsync(c); + shareOutstandingMessagesAsync(c); } } @@ -191,6 +192,7 @@ class ValidationManagerImpl implements ValidationManager, Service, private void deliverNextPendingMessage(Queue<MessageId> pending) { try { boolean anyInvalid = false, allDelivered = true; + Queue<MessageId> toShare = null; Queue<MessageId> invalidate = null; Transaction txn = db.startTransaction(false); try { @@ -213,8 +215,14 @@ class ValidationManagerImpl implements ValidationManager, Service, ClientId c = g.getClientId(); Metadata meta = db.getMessageMetadataForValidator(txn, id); - if (deliverMessage(txn, m, c, meta)) { + DeliveryResult result = deliverMessage(txn, m, c, meta); + if (result.valid) { pending.addAll(getPendingDependents(txn, id)); + if (result.share) { + db.setMessageShared(txn, id); + toShare = new LinkedList<MessageId>( + states.keySet()); + } } else { invalidate = getDependentsToInvalidate(txn, id); } @@ -226,6 +234,7 @@ class ValidationManagerImpl implements ValidationManager, Service, } if (invalidate != null) invalidateNextMessageAsync(invalidate); deliverNextPendingMessageAsync(pending); + if (toShare != null) shareNextMessageAsync(toShare); } catch (NoSuchMessageException e) { LOG.info("Message removed before delivery"); deliverNextPendingMessageAsync(pending); @@ -284,16 +293,17 @@ class ValidationManagerImpl implements ValidationManager, Service, } private void storeMessageContext(Message m, ClientId c, - MessageContext result) { + MessageContext context) { try { MessageId id = m.getId(); boolean anyInvalid = false, allDelivered = true; Queue<MessageId> invalidate = null; Queue<MessageId> pending = null; + Queue<MessageId> toShare = null; Transaction txn = db.startTransaction(false); try { // Check if message has any dependencies - Collection<MessageId> dependencies = result.getDependencies(); + Collection<MessageId> dependencies = context.getDependencies(); if (!dependencies.isEmpty()) { db.addMessageDependencies(txn, m, dependencies); // Check if dependencies are valid and delivered @@ -310,11 +320,17 @@ class ValidationManagerImpl implements ValidationManager, Service, invalidate = getDependentsToInvalidate(txn, id); } } else { - Metadata meta = result.getMetadata(); + Metadata meta = context.getMetadata(); db.mergeMessageMetadata(txn, id, meta); if (allDelivered) { - if (deliverMessage(txn, m, c, meta)) { + DeliveryResult result = deliverMessage(txn, m, c, meta); + if (result.valid) { pending = getPendingDependents(txn, id); + if (result.share) { + db.setMessageShared(txn, id); + toShare = + new LinkedList<MessageId>(dependencies); + } } else { invalidate = getDependentsToInvalidate(txn, id); } @@ -328,6 +344,7 @@ class ValidationManagerImpl implements ValidationManager, Service, } if (invalidate != null) invalidateNextMessageAsync(invalidate); if (pending != null) deliverNextPendingMessageAsync(pending); + if (toShare != null) shareNextMessageAsync(toShare); } catch (NoSuchMessageException e) { LOG.info("Message removed during validation"); } catch (NoSuchGroupException e) { @@ -337,18 +354,21 @@ class ValidationManagerImpl implements ValidationManager, Service, } } - private boolean deliverMessage(Transaction txn, Message m, ClientId c, - Metadata meta) throws DbException { + private DeliveryResult deliverMessage(Transaction txn, Message m, + ClientId c, Metadata meta) throws DbException { // Deliver the message to the client if it's registered a hook + boolean shareMsg = false; IncomingMessageHook hook = hooks.get(c); - if (hook != null) hook.incomingMessage(txn, m, meta); + if (hook != null) { + shareMsg = hook.incomingMessage(txn, m, meta); + } // TODO: Find a better way for clients to signal validity, #643 if (db.getRawMessage(txn, m.getId()) == null) { db.setMessageState(txn, m.getId(), INVALID); - return false; + return new DeliveryResult(false, false); } else { db.setMessageState(txn, m.getId(), DELIVERED); - return true; + return new DeliveryResult(true, shareMsg); } } @@ -362,6 +382,70 @@ class ValidationManagerImpl implements ValidationManager, Service, return pending; } + private void shareOutstandingMessagesAsync(final ClientId c) { + dbExecutor.execute(new Runnable() { + @Override + public void run() { + shareOutstandingMessages(c); + } + }); + } + + private void shareOutstandingMessages(ClientId c) { + try { + Queue<MessageId> toShare = new LinkedList<MessageId>(); + Transaction txn = db.startTransaction(true); + try { + toShare.addAll(db.getMessagesToShare(txn, c)); + txn.setComplete(); + } finally { + db.endTransaction(txn); + } + shareNextMessageAsync(toShare); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + + /** + * Shares the next message from the toShare queue asynchronously. + * + * This method should only be called for messages that have all their + * dependencies delivered and have been delivered themselves. + */ + private void shareNextMessageAsync(final Queue<MessageId> toShare) { + if (toShare.isEmpty()) return; + dbExecutor.execute(new Runnable() { + @Override + public void run() { + shareNextMessage(toShare); + } + }); + } + + private void shareNextMessage(Queue<MessageId> toShare) { + try { + Transaction txn = db.startTransaction(false); + try { + MessageId id = toShare.poll(); + db.setMessageShared(txn, id); + toShare.addAll(db.getMessageDependencies(txn, id).keySet()); + txn.setComplete(); + } finally { + db.endTransaction(txn); + } + shareNextMessageAsync(toShare); + } catch (NoSuchMessageException e) { + LOG.info("Message removed before sharing"); + shareNextMessageAsync(toShare); + } catch (NoSuchGroupException e) { + LOG.info("Group removed before sharing"); + shareNextMessageAsync(toShare); + } catch (DbException e) { + if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + private void invalidateNextMessageAsync(final Queue<MessageId> invalidate) { if (invalidate.isEmpty()) return; dbExecutor.execute(new Runnable() { @@ -447,4 +531,13 @@ class ValidationManagerImpl implements ValidationManager, Service, if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } } + + private static class DeliveryResult { + private final boolean valid, share; + + private DeliveryResult(boolean valid, boolean share) { + this.valid = valid; + this.share = share; + } + } } diff --git a/briar-tests/src/org/briarproject/blogs/BlogManagerImplTest.java b/briar-tests/src/org/briarproject/blogs/BlogManagerImplTest.java index c7819edddd..22fbf9eb7e 100644 --- a/briar-tests/src/org/briarproject/blogs/BlogManagerImplTest.java +++ b/briar-tests/src/org/briarproject/blogs/BlogManagerImplTest.java @@ -199,7 +199,6 @@ public class BlogManagerImplTest extends BriarTestCase { ); context.checking(new Expectations() {{ - oneOf(clientHelper).setMessageShared(txn, messageId, true); oneOf(identityManager) .getAuthorStatus(txn, blog1.getAuthor().getId()); will(returnValue(VERIFIED)); diff --git a/briar-tests/src/org/briarproject/db/DatabaseComponentImplTest.java b/briar-tests/src/org/briarproject/db/DatabaseComponentImplTest.java index 4ee08c8115..87603e76ef 100644 --- a/briar-tests/src/org/briarproject/db/DatabaseComponentImplTest.java +++ b/briar-tests/src/org/briarproject/db/DatabaseComponentImplTest.java @@ -760,7 +760,7 @@ public class DatabaseComponentImplTest extends BriarTestCase { transaction = db.startTransaction(false); try { - db.setMessageShared(transaction, message.getId(), true); + db.setMessageShared(transaction, message.getId()); fail(); } catch (NoSuchMessageException expected) { // Expected diff --git a/briar-tests/src/org/briarproject/db/H2DatabaseTest.java b/briar-tests/src/org/briarproject/db/H2DatabaseTest.java index 87c404006b..473aac8050 100644 --- a/briar-tests/src/org/briarproject/db/H2DatabaseTest.java +++ b/briar-tests/src/org/briarproject/db/H2DatabaseTest.java @@ -272,19 +272,12 @@ public class H2DatabaseTest extends BriarTestCase { assertTrue(ids.isEmpty()); // Sharing the message should make it sendable - db.setMessageShared(txn, messageId, true); + db.setMessageShared(txn, messageId); ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); assertEquals(Collections.singletonList(messageId), ids); ids = db.getMessagesToOffer(txn, contactId, 100); assertEquals(Collections.singletonList(messageId), ids); - // Unsharing the message should make it unsendable - db.setMessageShared(txn, messageId, false); - ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); - assertTrue(ids.isEmpty()); - ids = db.getMessagesToOffer(txn, contactId, 100); - assertTrue(ids.isEmpty()); - db.commitTransaction(txn); db.close(); } @@ -1363,6 +1356,43 @@ public class H2DatabaseTest extends BriarTestCase { db.close(); } + @Test + public void testGetMessagesToShare() throws Exception { + MessageId mId1 = new MessageId(TestUtils.getRandomId()); + MessageId mId2 = new MessageId(TestUtils.getRandomId()); + MessageId mId3 = new MessageId(TestUtils.getRandomId()); + MessageId mId4 = new MessageId(TestUtils.getRandomId()); + Message m1 = new Message(mId1, groupId, timestamp, raw); + Message m2 = new Message(mId2, groupId, timestamp, raw); + Message m3 = new Message(mId3, groupId, timestamp, raw); + Message m4 = new Message(mId4, groupId, timestamp, raw); + + Database<Connection> db = open(false); + Connection txn = db.startTransaction(); + + // Add a group and some messages + db.addGroup(txn, group); + db.addMessage(txn, m1, DELIVERED, true); + db.addMessage(txn, m2, DELIVERED, false); + db.addMessage(txn, m3, DELIVERED, false); + db.addMessage(txn, m4, DELIVERED, true); + + // Introduce dependencies between the messages + db.addMessageDependency(txn, groupId, mId1, mId2); + db.addMessageDependency(txn, groupId, mId3, mId1); + db.addMessageDependency(txn, groupId, mId4, mId3); + + // Retrieve messages to be shared + Collection<MessageId> result = + db.getMessagesToShare(txn, clientId); + assertEquals(2, result.size()); + assertTrue(result.contains(mId2)); + assertTrue(result.contains(mId3)); + + db.commitTransaction(txn); + db.close(); + } + @Test public void testGetMessageStatus() throws Exception { Database<Connection> db = open(false); diff --git a/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java b/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java index e7c2e85006..064f5e7496 100644 --- a/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java +++ b/briar-tests/src/org/briarproject/sync/ValidationManagerImplTest.java @@ -84,6 +84,7 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn3 = new Transaction(null, true); final Transaction txn4 = new Transaction(null, false); final Transaction txn5 = new Transaction(null, true); + final Transaction txn6 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -108,6 +109,7 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).mergeMessageMetadata(txn2, messageId, metadata); // Deliver the first message oneOf(hook).incomingMessage(txn2, message, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn2, messageId); will(returnValue(raw)); oneOf(db).setMessageState(txn2, messageId, DELIVERED); @@ -144,6 +146,12 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).getPendingMessages(txn5, clientId); will(returnValue(Collections.emptyList())); oneOf(db).endTransaction(txn5); + // Get messages to share + oneOf(db).startTransaction(true); + will(returnValue(txn6)); + oneOf(db).getMessagesToShare(txn6, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).endTransaction(txn6); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -160,6 +168,7 @@ public class ValidationManagerImplTest extends BriarTestCase { assertTrue(txn3.isComplete()); assertTrue(txn4.isComplete()); assertTrue(txn5.isComplete()); + assertTrue(txn6.isComplete()); } @Test @@ -175,6 +184,7 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn1 = new Transaction(null, true); final Transaction txn2 = new Transaction(null, false); final Transaction txn3 = new Transaction(null, false); + final Transaction txn4 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate @@ -205,6 +215,7 @@ public class ValidationManagerImplTest extends BriarTestCase { will(returnValue(new Metadata())); // Deliver the message oneOf(hook).incomingMessage(txn2, message, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn2, messageId); will(returnValue(raw)); oneOf(db).setMessageState(txn2, messageId, DELIVERED); @@ -228,6 +239,7 @@ public class ValidationManagerImplTest extends BriarTestCase { will(returnValue(metadata)); // Deliver the dependent oneOf(hook).incomingMessage(txn3, message2, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn3, messageId2); will(returnValue(raw)); oneOf(db).setMessageState(txn3, messageId2, DELIVERED); @@ -235,6 +247,79 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).getMessageDependents(txn3, messageId2); will(returnValue(Collections.emptyMap())); oneOf(db).endTransaction(txn3); + + // Get messages to share + oneOf(db).startTransaction(true); + will(returnValue(txn4)); + oneOf(db).getMessagesToShare(txn4, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).endTransaction(txn4); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.startService(); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); + assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); + } + + @Test + public void testMessagesAreSharedAtStartup() throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, true); + final Transaction txn2 = new Transaction(null, true); + final Transaction txn3 = new Transaction(null, false); + final Transaction txn4 = new Transaction(null, false); + + context.checking(new Expectations() {{ + // No messages to validate + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getMessagesToValidate(txn, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).endTransaction(txn); + // No pending messages to deliver + oneOf(db).startTransaction(true); + will(returnValue(txn1)); + oneOf(db).getPendingMessages(txn1, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).endTransaction(txn1); + + // Get messages to share + oneOf(db).startTransaction(true); + will(returnValue(txn2)); + oneOf(db).getMessagesToShare(txn2, clientId); + will(returnValue(Collections.singletonList(messageId))); + oneOf(db).endTransaction(txn2); + // Share message and get dependencies + oneOf(db).startTransaction(false); + will(returnValue(txn3)); + oneOf(db).setMessageShared(txn3, messageId); + oneOf(db).getMessageDependencies(txn3, messageId); + will(returnValue(Collections.singletonMap(messageId2, DELIVERED))); + oneOf(db).endTransaction(txn3); + // Share dependency + oneOf(db).startTransaction(false); + will(returnValue(txn4)); + oneOf(db).setMessageShared(txn4, messageId2); + oneOf(db).getMessageDependencies(txn4, messageId2); + will(returnValue(Collections.emptyMap())); + oneOf(db).endTransaction(txn4); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -249,6 +334,71 @@ public class ValidationManagerImplTest extends BriarTestCase { assertTrue(txn1.isComplete()); assertTrue(txn2.isComplete()); assertTrue(txn3.isComplete()); + assertTrue(txn4.isComplete()); + } + + @Test + public void testIncomingMessagesAreShared() throws Exception { + Mockery context = new Mockery(); + final DatabaseComponent db = context.mock(DatabaseComponent.class); + final Executor dbExecutor = new ImmediateExecutor(); + final Executor cryptoExecutor = new ImmediateExecutor(); + final MessageValidator validator = context.mock(MessageValidator.class); + final IncomingMessageHook hook = + context.mock(IncomingMessageHook.class); + final Transaction txn = new Transaction(null, true); + final Transaction txn1 = new Transaction(null, false); + final Transaction txn2 = new Transaction(null, false); + context.checking(new Expectations() {{ + // Load the group + oneOf(db).startTransaction(true); + will(returnValue(txn)); + oneOf(db).getGroup(txn, groupId); + will(returnValue(group)); + oneOf(db).endTransaction(txn); + // Validate the message: valid + oneOf(validator).validateMessage(message, group); + will(returnValue(validResultWithDependencies)); + // Store the validation result + oneOf(db).startTransaction(false); + will(returnValue(txn1)); + oneOf(db).addMessageDependencies(txn1, message, + validResultWithDependencies.getDependencies()); + oneOf(db).getMessageDependencies(txn1, messageId); + will(returnValue(Collections.singletonMap(messageId1, DELIVERED))); + oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); + // Deliver the message + oneOf(hook).incomingMessage(txn1, message, metadata); + will(returnValue(true)); + oneOf(db).getRawMessage(txn1, messageId); + will(returnValue(raw)); + oneOf(db).setMessageState(txn1, messageId, DELIVERED); + // Get any pending dependents + oneOf(db).getMessageDependents(txn1, messageId); + will(returnValue(Collections.emptyMap())); + // Share message + oneOf(db).setMessageShared(txn1, messageId); + oneOf(db).endTransaction(txn1); + // Share dependencies + oneOf(db).startTransaction(false); + will(returnValue(txn2)); + oneOf(db).setMessageShared(txn2, messageId1); + oneOf(db).getMessageDependencies(txn2, messageId1); + will(returnValue(Collections.emptyMap())); + oneOf(db).endTransaction(txn2); + }}); + + ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, + cryptoExecutor); + vm.registerMessageValidator(clientId, validator); + vm.registerIncomingMessageHook(clientId, hook); + vm.eventOccurred(new MessageAddedEvent(message, contactId)); + + context.assertIsSatisfied(); + + assertTrue(txn.isComplete()); + assertTrue(txn1.isComplete()); + assertTrue(txn2.isComplete()); } @Test @@ -266,6 +416,7 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn2 = new Transaction(null, true); final Transaction txn3 = new Transaction(null, false); final Transaction txn4 = new Transaction(null, true); + final Transaction txn5 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -308,6 +459,12 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).getPendingMessages(txn4, clientId); will(returnValue(Collections.emptyList())); oneOf(db).endTransaction(txn4); + // Get messages to share + oneOf(db).startTransaction(true); + will(returnValue(txn5)); + oneOf(db).getMessagesToShare(txn5, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).endTransaction(txn5); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -323,6 +480,7 @@ public class ValidationManagerImplTest extends BriarTestCase { assertTrue(txn2.isComplete()); assertTrue(txn3.isComplete()); assertTrue(txn4.isComplete()); + assertTrue(txn5.isComplete()); } @Test @@ -340,6 +498,7 @@ public class ValidationManagerImplTest extends BriarTestCase { final Transaction txn2 = new Transaction(null, true); final Transaction txn3 = new Transaction(null, false); final Transaction txn4 = new Transaction(null, true); + final Transaction txn5 = new Transaction(null, true); context.checking(new Expectations() {{ // Get messages to validate oneOf(db).startTransaction(true); @@ -385,6 +544,12 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).getPendingMessages(txn4, clientId); will(returnValue(Collections.emptyList())); oneOf(db).endTransaction(txn4); + // Get messages to share + oneOf(db).startTransaction(true); + will(returnValue(txn5)); + oneOf(db).getMessagesToShare(txn5, clientId); + will(returnValue(Collections.emptyList())); + oneOf(db).endTransaction(txn5); }}); ValidationManagerImpl vm = new ValidationManagerImpl(db, dbExecutor, @@ -400,6 +565,7 @@ public class ValidationManagerImplTest extends BriarTestCase { assertTrue(txn2.isComplete()); assertTrue(txn3.isComplete()); assertTrue(txn4.isComplete()); + assertTrue(txn5.isComplete()); } @Test @@ -429,6 +595,7 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); // Deliver the message oneOf(hook).incomingMessage(txn1, message, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn1, messageId); will(returnValue(raw)); oneOf(db).setMessageState(txn1, messageId, DELIVERED); @@ -548,6 +715,7 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); // Deliver the message oneOf(hook).incomingMessage(txn1, message, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn1, messageId); will(returnValue(raw)); oneOf(db).setMessageState(txn1, messageId, DELIVERED); @@ -792,6 +960,7 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); // Deliver the message oneOf(hook).incomingMessage(txn1, message, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn1, messageId); will(returnValue(raw)); oneOf(db).setMessageState(txn1, messageId, DELIVERED); @@ -815,6 +984,7 @@ public class ValidationManagerImplTest extends BriarTestCase { will(returnValue(metadata)); // Deliver message 1 oneOf(hook).incomingMessage(txn2, message1, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn2, messageId1); will(returnValue(raw)); oneOf(db).setMessageState(txn2, messageId1, DELIVERED); @@ -838,6 +1008,7 @@ public class ValidationManagerImplTest extends BriarTestCase { will(returnValue(metadata)); // Deliver message 2 oneOf(hook).incomingMessage(txn3, message2, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn3, messageId2); will(returnValue(raw)); oneOf(db).setMessageState(txn3, messageId2, DELIVERED); @@ -890,6 +1061,7 @@ public class ValidationManagerImplTest extends BriarTestCase { will(returnValue(metadata)); // Deliver message 4 oneOf(hook).incomingMessage(txn6, message4, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn6, messageId4); will(returnValue(raw)); oneOf(db).setMessageState(txn6, messageId4, DELIVERED); @@ -947,6 +1119,7 @@ public class ValidationManagerImplTest extends BriarTestCase { oneOf(db).mergeMessageMetadata(txn1, messageId, metadata); // Deliver the message oneOf(hook).incomingMessage(txn1, message, metadata); + will(returnValue(false)); oneOf(db).getRawMessage(txn1, messageId); will(returnValue(raw)); oneOf(db).setMessageState(txn1, messageId, DELIVERED); -- GitLab