diff --git a/briar-api/src/org/briarproject/api/clients/MessageQueueManager.java b/briar-api/src/org/briarproject/api/clients/MessageQueueManager.java new file mode 100644 index 0000000000000000000000000000000000000000..def27a66741b9e59d93592bcdbea6d6f347e4adf --- /dev/null +++ b/briar-api/src/org/briarproject/api/clients/MessageQueueManager.java @@ -0,0 +1,38 @@ +package org.briarproject.api.clients; + +import org.briarproject.api.db.DbException; +import org.briarproject.api.db.Metadata; +import org.briarproject.api.db.Transaction; +import org.briarproject.api.sync.ClientId; +import org.briarproject.api.sync.Group; + +public interface MessageQueueManager { + + /** + * The key used for storing the queue's state in the group metadata. + */ + String QUEUE_STATE_KEY = "queueState"; + + /** + * Sends a message using the given queue. + */ + QueueMessage sendMessage(Transaction txn, Group queue, long timestamp, + byte[] body, Metadata meta) throws DbException; + + /** + * Sets the message validator for the given client. + */ + void registerMessageValidator(ClientId c, QueueMessageValidator v); + + /** + * Sets the incoming message hook for the given client. The hook will be + * called once for each incoming message that passes validation. Messages + * are passed to the hook in order. + */ + void registerIncomingMessageHook(ClientId c, IncomingQueueMessageHook hook); + + interface IncomingQueueMessageHook { + void incomingMessage(Transaction txn, QueueMessage q, Metadata meta) + throws DbException; + } +} diff --git a/briar-api/src/org/briarproject/api/clients/QueueMessage.java b/briar-api/src/org/briarproject/api/clients/QueueMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..0979c3d19493d663292cbda98020b53ee3cb3ebe --- /dev/null +++ b/briar-api/src/org/briarproject/api/clients/QueueMessage.java @@ -0,0 +1,28 @@ +package org.briarproject.api.clients; + +import org.briarproject.api.sync.GroupId; +import org.briarproject.api.sync.Message; +import org.briarproject.api.sync.MessageId; + +import static org.briarproject.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; +import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; + +public class QueueMessage extends Message { + + public static final int QUEUE_MESSAGE_HEADER_LENGTH = + MESSAGE_HEADER_LENGTH + 8; + public static final int MAX_QUEUE_MESSAGE_BODY_LENGTH = + MAX_MESSAGE_BODY_LENGTH - 8; + + private final long queuePosition; + + public QueueMessage(MessageId id, GroupId groupId, long timestamp, + long queuePosition, byte[] raw) { + super(id, groupId, timestamp, raw); + this.queuePosition = queuePosition; + } + + public long getQueuePosition() { + return queuePosition; + } +} diff --git a/briar-api/src/org/briarproject/api/clients/QueueMessageFactory.java b/briar-api/src/org/briarproject/api/clients/QueueMessageFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..cea02f230f7c9b3a42657a4eb80c894e086d597d --- /dev/null +++ b/briar-api/src/org/briarproject/api/clients/QueueMessageFactory.java @@ -0,0 +1,12 @@ +package org.briarproject.api.clients; + +import org.briarproject.api.sync.GroupId; +import org.briarproject.api.sync.MessageId; + +public interface QueueMessageFactory { + + QueueMessage createMessage(GroupId groupId, long timestamp, + long queuePosition, byte[] body); + + QueueMessage createMessage(MessageId id, byte[] raw); +} diff --git a/briar-api/src/org/briarproject/api/clients/QueueMessageValidator.java b/briar-api/src/org/briarproject/api/clients/QueueMessageValidator.java new file mode 100644 index 0000000000000000000000000000000000000000..1d25b2039d50e3f89d4eafc66023aa732730aa56 --- /dev/null +++ b/briar-api/src/org/briarproject/api/clients/QueueMessageValidator.java @@ -0,0 +1,13 @@ +package org.briarproject.api.clients; + +import org.briarproject.api.db.Metadata; +import org.briarproject.api.sync.Group; + +public interface QueueMessageValidator { + + /** + * Validates the given message and returns its metadata if the message + * is valid, or null if the message is invalid. + */ + Metadata validateMessage(QueueMessage q, Group g); +} diff --git a/briar-core/src/org/briarproject/clients/ClientsModule.java b/briar-core/src/org/briarproject/clients/ClientsModule.java index 4f3a1d4a9fd616b575af179bbc2be331f89c7af6..b4c1a306cee9f43240883584eae7144ed906f365 100644 --- a/briar-core/src/org/briarproject/clients/ClientsModule.java +++ b/briar-core/src/org/briarproject/clients/ClientsModule.java @@ -3,13 +3,17 @@ package org.briarproject.clients; import com.google.inject.AbstractModule; import org.briarproject.api.clients.ClientHelper; +import org.briarproject.api.clients.MessageQueueManager; import org.briarproject.api.clients.PrivateGroupFactory; +import org.briarproject.api.clients.QueueMessageFactory; public class ClientsModule extends AbstractModule { @Override protected void configure() { bind(ClientHelper.class).to(ClientHelperImpl.class); + bind(MessageQueueManager.class).to(MessageQueueManagerImpl.class); bind(PrivateGroupFactory.class).to(PrivateGroupFactoryImpl.class); + bind(QueueMessageFactory.class).to(QueueMessageFactoryImpl.class); } } diff --git a/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java b/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..5889cecdfc64efb34a171b2aa12fd75002ae5623 --- /dev/null +++ b/briar-core/src/org/briarproject/clients/MessageQueueManagerImpl.java @@ -0,0 +1,211 @@ +package org.briarproject.clients; + +import org.briarproject.api.FormatException; +import org.briarproject.api.clients.ClientHelper; +import org.briarproject.api.clients.MessageQueueManager; +import org.briarproject.api.clients.QueueMessage; +import org.briarproject.api.clients.QueueMessageFactory; +import org.briarproject.api.clients.QueueMessageValidator; +import org.briarproject.api.data.BdfDictionary; +import org.briarproject.api.data.BdfList; +import org.briarproject.api.db.DatabaseComponent; +import org.briarproject.api.db.DbException; +import org.briarproject.api.db.Metadata; +import org.briarproject.api.db.Transaction; +import org.briarproject.api.sync.ClientId; +import org.briarproject.api.sync.Group; +import org.briarproject.api.sync.GroupId; +import org.briarproject.api.sync.Message; +import org.briarproject.api.sync.MessageId; +import org.briarproject.api.sync.MessageValidator; +import org.briarproject.api.sync.ValidationManager; +import org.briarproject.api.sync.ValidationManager.IncomingMessageHook; +import org.briarproject.util.ByteUtils; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.TreeMap; +import java.util.logging.Logger; + +import javax.inject.Inject; + +import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; +import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; + +class MessageQueueManagerImpl implements MessageQueueManager { + + private static final String OUTGOING_POSITION_KEY = "nextOut"; + private static final String INCOMING_POSITION_KEY = "nextIn"; + private static final String PENDING_MESSAGES_KEY = "pending"; + + private static final Logger LOG = + Logger.getLogger(MessageQueueManagerImpl.class.getName()); + + private final DatabaseComponent db; + private final ClientHelper clientHelper; + private final QueueMessageFactory queueMessageFactory; + private final ValidationManager validationManager; + + @Inject + MessageQueueManagerImpl(DatabaseComponent db, ClientHelper clientHelper, + QueueMessageFactory queueMessageFactory, + ValidationManager validationManager) { + this.db = db; + this.clientHelper = clientHelper; + this.queueMessageFactory = queueMessageFactory; + this.validationManager = validationManager; + } + + @Override + public QueueMessage sendMessage(Transaction txn, Group queue, + long timestamp, byte[] body, Metadata meta) throws DbException { + QueueState queueState = loadQueueState(txn, queue.getId()); + long queuePosition = queueState.outgoingPosition; + queueState.outgoingPosition++; + saveQueueState(txn, queue.getId(), queueState); + QueueMessage q = queueMessageFactory.createMessage(queue.getId(), + timestamp, queuePosition, body); + db.addLocalMessage(txn, q, queue.getClientId(), meta, true); + return q; + } + + @Override + public void registerMessageValidator(ClientId c, QueueMessageValidator v) { + validationManager.registerMessageValidator(c, + new DelegatingMessageValidator(v)); + } + + @Override + public void registerIncomingMessageHook(ClientId c, + IncomingQueueMessageHook hook) { + validationManager.registerIncomingMessageHook(c, + new DelegatingIncomingMessageHook(hook)); + } + + private QueueState loadQueueState(Transaction txn, GroupId g) + throws DbException { + try { + TreeMap<Long, MessageId> pending = new TreeMap<Long, MessageId>(); + Metadata groupMeta = db.getGroupMetadata(txn, g); + byte[] raw = groupMeta.get(QUEUE_STATE_KEY); + if (raw == null) return new QueueState(0, 0, pending); + BdfDictionary d = clientHelper.toDictionary(raw, 0, raw.length); + long outgoingPosition = d.getLong(OUTGOING_POSITION_KEY); + long incomingPosition = d.getLong(INCOMING_POSITION_KEY); + BdfList pendingList = d.getList(PENDING_MESSAGES_KEY); + for (int i = 0; i < pendingList.size(); i++) { + BdfList item = pendingList.getList(i); + if (item.size() != 2) throw new FormatException(); + pending.put(item.getLong(0), new MessageId(item.getRaw(1))); + } + return new QueueState(outgoingPosition, incomingPosition, pending); + } catch (FormatException e) { + throw new DbException(e); + } + } + + private void saveQueueState(Transaction txn, GroupId g, + QueueState queueState) throws DbException { + try { + BdfDictionary d = new BdfDictionary(); + d.put(OUTGOING_POSITION_KEY, queueState.outgoingPosition); + d.put(INCOMING_POSITION_KEY, queueState.incomingPosition); + BdfList pendingList = new BdfList(); + for (Entry<Long, MessageId> e : queueState.pending.entrySet()) + pendingList.add(BdfList.of(e.getKey(), e.getValue())); + d.put(PENDING_MESSAGES_KEY, pendingList); + Metadata groupMeta = new Metadata(); + groupMeta.put(QUEUE_STATE_KEY, clientHelper.toByteArray(d)); + db.mergeGroupMetadata(txn, g, groupMeta); + } catch (FormatException e) { + throw new RuntimeException(e); + } + } + + private static class QueueState { + + private long outgoingPosition, incomingPosition; + private final TreeMap<Long, MessageId> pending; + + QueueState(long outgoingPosition, long incomingPosition, + TreeMap<Long, MessageId> pending) { + this.outgoingPosition = outgoingPosition; + this.incomingPosition = incomingPosition; + this.pending = pending; + } + + MessageId popIncomingMessageId() { + Iterator<Entry<Long, MessageId>> it = pending.entrySet().iterator(); + if (!it.hasNext()) return null; + Entry<Long, MessageId> e = it.next(); + if (!e.getKey().equals(incomingPosition)) return null; + it.remove(); + incomingPosition++; + return e.getValue(); + } + } + + private static class DelegatingMessageValidator + implements MessageValidator { + + private final QueueMessageValidator delegate; + + DelegatingMessageValidator(QueueMessageValidator delegate) { + this.delegate = delegate; + } + + @Override + public Metadata validateMessage(Message m, Group g) { + byte[] raw = m.getRaw(); + if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH) return null; + long queuePosition = ByteUtils.readUint64(raw, + MESSAGE_HEADER_LENGTH); + if (queuePosition < 0) return null; + QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(), + m.getTimestamp(), queuePosition, raw); + return delegate.validateMessage(q, g); + } + } + + private class DelegatingIncomingMessageHook implements IncomingMessageHook { + + private final IncomingQueueMessageHook delegate; + + DelegatingIncomingMessageHook(IncomingQueueMessageHook delegate) { + this.delegate = delegate; + } + + @Override + public void incomingMessage(Transaction txn, Message m, Metadata meta) + throws DbException { + long queuePosition = ByteUtils.readUint64(m.getRaw(), + MESSAGE_HEADER_LENGTH); + QueueState queueState = loadQueueState(txn, m.getGroupId()); + if (queuePosition < queueState.incomingPosition) { + // A message with this queue position has already been seen + LOG.warning("Deleting message with duplicate position"); + db.deleteMessage(txn, m.getId()); + db.deleteMessageMetadata(txn, m.getId()); + } else if (queuePosition > queueState.incomingPosition) { + // The message is out of order, add it to the pending list + queueState.pending.put(queuePosition, m.getId()); + saveQueueState(txn, m.getGroupId(), queueState); + } else { + // The message is in order, pass it to the delegate + QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(), + m.getTimestamp(), queuePosition, m.getRaw()); + delegate.incomingMessage(txn, q, meta); + queueState.incomingPosition++; + // Pass any consecutive messages to the delegate + MessageId id; + while ((id = queueState.popIncomingMessageId()) != null) { + byte[] raw = db.getRawMessage(txn, id); + meta = db.getMessageMetadata(txn, id); + q = queueMessageFactory.createMessage(id, raw); + delegate.incomingMessage(txn, q, meta); + } + saveQueueState(txn, m.getGroupId(), queueState); + } + } + } +} diff --git a/briar-core/src/org/briarproject/clients/PrivateGroupFactoryImpl.java b/briar-core/src/org/briarproject/clients/PrivateGroupFactoryImpl.java index c1842e6ba6230dca5ede95fd579c8c72d7b274de..5fd4ee98ae535ec8a3d14f1e886f6fa7c8a12c03 100644 --- a/briar-core/src/org/briarproject/clients/PrivateGroupFactoryImpl.java +++ b/briar-core/src/org/briarproject/clients/PrivateGroupFactoryImpl.java @@ -3,28 +3,26 @@ package org.briarproject.clients; import com.google.inject.Inject; import org.briarproject.api.Bytes; +import org.briarproject.api.FormatException; +import org.briarproject.api.clients.ClientHelper; import org.briarproject.api.clients.PrivateGroupFactory; import org.briarproject.api.contact.Contact; -import org.briarproject.api.data.BdfWriter; -import org.briarproject.api.data.BdfWriterFactory; +import org.briarproject.api.data.BdfList; import org.briarproject.api.identity.AuthorId; import org.briarproject.api.sync.ClientId; import org.briarproject.api.sync.Group; import org.briarproject.api.sync.GroupFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - class PrivateGroupFactoryImpl implements PrivateGroupFactory { private final GroupFactory groupFactory; - private final BdfWriterFactory bdfWriterFactory; + private final ClientHelper clientHelper; @Inject PrivateGroupFactoryImpl(GroupFactory groupFactory, - BdfWriterFactory bdfWriterFactory) { + ClientHelper clientHelper) { this.groupFactory = groupFactory; - this.bdfWriterFactory = bdfWriterFactory; + this.clientHelper = clientHelper; } @Override @@ -36,22 +34,12 @@ class PrivateGroupFactoryImpl implements PrivateGroupFactory { } private byte[] createGroupDescriptor(AuthorId local, AuthorId remote) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - BdfWriter w = bdfWriterFactory.createWriter(out); try { - w.writeListStart(); - if (Bytes.COMPARATOR.compare(local, remote) < 0) { - w.writeRaw(local.getBytes()); - w.writeRaw(remote.getBytes()); - } else { - w.writeRaw(remote.getBytes()); - w.writeRaw(local.getBytes()); - } - w.writeListEnd(); - } catch (IOException e) { - // Shouldn't happen with ByteArrayOutputStream + if (Bytes.COMPARATOR.compare(local, remote) < 0) + return clientHelper.toByteArray(BdfList.of(local, remote)); + else return clientHelper.toByteArray(BdfList.of(remote, local)); + } catch (FormatException e) { throw new RuntimeException(e); } - return out.toByteArray(); } } diff --git a/briar-core/src/org/briarproject/clients/QueueMessageFactoryImpl.java b/briar-core/src/org/briarproject/clients/QueueMessageFactoryImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..d64cbb5713f548c846941397c659d856ebdef15a --- /dev/null +++ b/briar-core/src/org/briarproject/clients/QueueMessageFactoryImpl.java @@ -0,0 +1,55 @@ +package org.briarproject.clients; + +import org.briarproject.api.UniqueId; +import org.briarproject.api.clients.QueueMessage; +import org.briarproject.api.clients.QueueMessageFactory; +import org.briarproject.api.crypto.CryptoComponent; +import org.briarproject.api.sync.GroupId; +import org.briarproject.api.sync.MessageId; +import org.briarproject.util.ByteUtils; + +import javax.inject.Inject; + +import static org.briarproject.api.clients.QueueMessage.MAX_QUEUE_MESSAGE_BODY_LENGTH; +import static org.briarproject.api.clients.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; +import static org.briarproject.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; +import static org.briarproject.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; + +class QueueMessageFactoryImpl implements QueueMessageFactory { + + private final CryptoComponent crypto; + + @Inject + QueueMessageFactoryImpl(CryptoComponent crypto) { + this.crypto = crypto; + } + + @Override + public QueueMessage createMessage(GroupId groupId, long timestamp, + long queuePosition, byte[] body) { + if (body.length > MAX_QUEUE_MESSAGE_BODY_LENGTH) + throw new IllegalArgumentException(); + byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH + body.length]; + System.arraycopy(groupId.getBytes(), 0, raw, 0, UniqueId.LENGTH); + ByteUtils.writeUint64(timestamp, raw, UniqueId.LENGTH); + ByteUtils.writeUint64(queuePosition, raw, MESSAGE_HEADER_LENGTH); + System.arraycopy(body, 0, raw, QUEUE_MESSAGE_HEADER_LENGTH, + body.length); + MessageId id = new MessageId(crypto.hash(MessageId.LABEL, raw)); + return new QueueMessage(id, groupId, timestamp, queuePosition, raw); + } + + @Override + public QueueMessage createMessage(MessageId id, byte[] raw) { + if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH) + throw new IllegalArgumentException(); + if (raw.length > MAX_MESSAGE_LENGTH) + throw new IllegalArgumentException(); + byte[] groupId = new byte[UniqueId.LENGTH]; + System.arraycopy(raw, 0, groupId, 0, UniqueId.LENGTH); + long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH); + long queuePosition = ByteUtils.readUint64(raw, MESSAGE_HEADER_LENGTH); + return new QueueMessage(id, new GroupId(groupId), timestamp, + queuePosition, raw); + } +} diff --git a/briar-core/src/org/briarproject/forum/ForumListValidator.java b/briar-core/src/org/briarproject/forum/ForumListValidator.java index e4b6e4edc6b3775e57ca3332c092b989f85c1735..94a5ca684fd4aa040b7b7a36daafbe7766b4b2f0 100644 --- a/briar-core/src/org/briarproject/forum/ForumListValidator.java +++ b/briar-core/src/org/briarproject/forum/ForumListValidator.java @@ -20,7 +20,7 @@ class ForumListValidator extends BdfMessageValidator { } @Override - public BdfDictionary validateMessage(BdfList message, Group g, + protected BdfDictionary validateMessage(BdfList message, Group g, long timestamp) throws FormatException { // Version, forum list checkSize(message, 2);