diff --git a/briar-api/src/main/java/org/briarproject/briar/api/client/MessageQueueManager.java b/briar-api/src/main/java/org/briarproject/briar/api/client/MessageQueueManager.java deleted file mode 100644 index 75418bf7c27afd1375a2156d6ac6b1f15388805e..0000000000000000000000000000000000000000 --- a/briar-api/src/main/java/org/briarproject/briar/api/client/MessageQueueManager.java +++ /dev/null @@ -1,72 +0,0 @@ -package org.briarproject.briar.api.client; - -import org.briarproject.bramble.api.db.DbException; -import org.briarproject.bramble.api.db.Metadata; -import org.briarproject.bramble.api.db.Transaction; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.sync.ClientId; -import org.briarproject.bramble.api.sync.Group; -import org.briarproject.bramble.api.sync.InvalidMessageException; -import org.briarproject.bramble.api.sync.MessageContext; - -@Deprecated -@NotNullByDefault -public interface MessageQueueManager { - - /** - * The key used for storing the queue's state in the group metadata. - */ - String QUEUE_STATE_KEY = "queueState"; - - /** - * Sends a message using the given queue. - */ - QueueMessage sendMessage(Transaction txn, Group queue, long timestamp, - byte[] body, Metadata meta) throws DbException; - - /** - * Sets the message validator for the given client. - */ - void registerMessageValidator(ClientId c, QueueMessageValidator v); - - /** - * Sets the incoming message hook for the given client. The hook will be - * called once for each incoming message that passes validation. Messages - * are passed to the hook in order. - */ - void registerIncomingMessageHook(ClientId c, IncomingQueueMessageHook hook); - - @Deprecated - interface QueueMessageValidator { - - /** - * Validates the given message and returns its metadata and - * dependencies. - */ - MessageContext validateMessage(QueueMessage q, Group g) - throws InvalidMessageException; - } - - @Deprecated - interface IncomingQueueMessageHook { - - /** - * Called once for each incoming message that passes validation. - * Messages are passed to the hook in order. - * - * @throws DbException Should only be used for real database errors. - * If this is thrown, delivery will be attempted again at next startup, - * whereas if an InvalidMessageException is thrown, - * the message will be permanently invalidated. - * @throws InvalidMessageException for any non-database error - * that occurs while handling remotely created data. - * This includes errors that occur while handling locally created data - * in a context controlled by remotely created data - * (for example, parsing the metadata of a dependency - * of an incoming message). - * Never rethrow DbException as InvalidMessageException! - */ - void incomingMessage(Transaction txn, QueueMessage q, Metadata meta) - throws DbException, InvalidMessageException; - } -} diff --git a/briar-api/src/main/java/org/briarproject/briar/api/client/ProtocolEngine.java b/briar-api/src/main/java/org/briarproject/briar/api/client/ProtocolEngine.java deleted file mode 100644 index 281d9af86ff4b2d642a08d0854860cf46a626db3..0000000000000000000000000000000000000000 --- a/briar-api/src/main/java/org/briarproject/briar/api/client/ProtocolEngine.java +++ /dev/null @@ -1,50 +0,0 @@ -package org.briarproject.briar.api.client; - -import org.briarproject.bramble.api.event.Event; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; - -import java.util.List; - -@Deprecated -@NotNullByDefault -public interface ProtocolEngine<A, S, M> { - - StateUpdate<S, M> onLocalAction(S localState, A action); - - StateUpdate<S, M> onMessageReceived(S localState, M received); - - StateUpdate<S, M> onMessageDelivered(S localState, M delivered); - - class StateUpdate<S, M> { - public final boolean deleteMessage; - public final boolean deleteState; - public final S localState; - public final List<M> toSend; - public final List<Event> toBroadcast; - - /** - * This class represents an update of the local protocol state. - * It only shows how the state should be updated, - * but does not carry out the updates on its own. - * - * @param deleteMessage whether to delete the message that triggered - * the state update. This will be ignored for - * {@link ProtocolEngine#onLocalAction}. - * @param deleteState whether to delete the localState {@link S} - * @param localState the new local state - * @param toSend a list of messages to be sent as part of the - * state update - * @param toBroadcast a list of events to broadcast as result of the - * state update - */ - public StateUpdate(boolean deleteMessage, boolean deleteState, - S localState, List<M> toSend, List<Event> toBroadcast) { - - this.deleteMessage = deleteMessage; - this.deleteState = deleteState; - this.localState = localState; - this.toSend = toSend; - this.toBroadcast = toBroadcast; - } - } -} diff --git a/briar-api/src/main/java/org/briarproject/briar/api/client/QueueMessage.java b/briar-api/src/main/java/org/briarproject/briar/api/client/QueueMessage.java deleted file mode 100644 index c41b6da2b665dfc2dbc5b0c939c81f7917c80543..0000000000000000000000000000000000000000 --- a/briar-api/src/main/java/org/briarproject/briar/api/client/QueueMessage.java +++ /dev/null @@ -1,31 +0,0 @@ -package org.briarproject.briar.api.client; - -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.sync.GroupId; -import org.briarproject.bramble.api.sync.Message; -import org.briarproject.bramble.api.sync.MessageId; - -import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_BODY_LENGTH; -import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; - -@Deprecated -@NotNullByDefault -public class QueueMessage extends Message { - - public static final int QUEUE_MESSAGE_HEADER_LENGTH = - MESSAGE_HEADER_LENGTH + 8; - public static final int MAX_QUEUE_MESSAGE_BODY_LENGTH = - MAX_MESSAGE_BODY_LENGTH - 8; - - private final long queuePosition; - - public QueueMessage(MessageId id, GroupId groupId, long timestamp, - long queuePosition, byte[] raw) { - super(id, groupId, timestamp, raw); - this.queuePosition = queuePosition; - } - - public long getQueuePosition() { - return queuePosition; - } -} diff --git a/briar-api/src/main/java/org/briarproject/briar/api/client/QueueMessageFactory.java b/briar-api/src/main/java/org/briarproject/briar/api/client/QueueMessageFactory.java deleted file mode 100644 index ac458a8a89494536aab35ba09fffc68e0a80b182..0000000000000000000000000000000000000000 --- a/briar-api/src/main/java/org/briarproject/briar/api/client/QueueMessageFactory.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.briarproject.briar.api.client; - -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.sync.GroupId; -import org.briarproject.bramble.api.sync.MessageId; - -@Deprecated -@NotNullByDefault -public interface QueueMessageFactory { - - QueueMessage createMessage(GroupId groupId, long timestamp, - long queuePosition, byte[] body); - - QueueMessage createMessage(MessageId id, byte[] raw); -} diff --git a/briar-core/src/main/java/org/briarproject/briar/client/BdfIncomingMessageHook.java b/briar-core/src/main/java/org/briarproject/briar/client/BdfIncomingMessageHook.java index b7e16c8d61039f33f1f70a4f4acfc01a8fe13d04..17f02ca43fa928697134f2494b458b91438310e6 100644 --- a/briar-core/src/main/java/org/briarproject/briar/client/BdfIncomingMessageHook.java +++ b/briar-core/src/main/java/org/briarproject/briar/client/BdfIncomingMessageHook.java @@ -13,18 +13,14 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.InvalidMessageException; import org.briarproject.bramble.api.sync.Message; import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook; -import org.briarproject.briar.api.client.MessageQueueManager.IncomingQueueMessageHook; -import org.briarproject.briar.api.client.QueueMessage; import javax.annotation.concurrent.Immutable; import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; -import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; @Immutable @NotNullByDefault -public abstract class BdfIncomingMessageHook implements IncomingMessageHook, - IncomingQueueMessageHook { +public abstract class BdfIncomingMessageHook implements IncomingMessageHook { protected final DatabaseComponent db; protected final ClientHelper clientHelper; @@ -67,16 +63,6 @@ public abstract class BdfIncomingMessageHook implements IncomingMessageHook, } } - @Override - public void incomingMessage(Transaction txn, QueueMessage q, Metadata meta) - throws DbException, InvalidMessageException { - try { - incomingMessage(txn, q, meta, QUEUE_MESSAGE_HEADER_LENGTH); - } catch (FormatException e) { - throw new InvalidMessageException(e); - } - } - private boolean incomingMessage(Transaction txn, Message m, Metadata meta, int headerLength) throws DbException, FormatException { byte[] raw = m.getRaw(); diff --git a/briar-core/src/main/java/org/briarproject/briar/client/BdfQueueMessageValidator.java b/briar-core/src/main/java/org/briarproject/briar/client/BdfQueueMessageValidator.java deleted file mode 100644 index 48fd665a10f17cd77c2fc446da96ccf5a39d3181..0000000000000000000000000000000000000000 --- a/briar-core/src/main/java/org/briarproject/briar/client/BdfQueueMessageValidator.java +++ /dev/null @@ -1,71 +0,0 @@ -package org.briarproject.briar.client; - -import org.briarproject.bramble.api.FormatException; -import org.briarproject.bramble.api.client.BdfMessageContext; -import org.briarproject.bramble.api.client.ClientHelper; -import org.briarproject.bramble.api.data.BdfList; -import org.briarproject.bramble.api.data.MetadataEncoder; -import org.briarproject.bramble.api.db.Metadata; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.sync.Group; -import org.briarproject.bramble.api.sync.InvalidMessageException; -import org.briarproject.bramble.api.sync.Message; -import org.briarproject.bramble.api.sync.MessageContext; -import org.briarproject.bramble.api.system.Clock; -import org.briarproject.briar.api.client.MessageQueueManager.QueueMessageValidator; -import org.briarproject.briar.api.client.QueueMessage; - -import java.util.logging.Logger; - -import javax.annotation.concurrent.Immutable; - -import static org.briarproject.bramble.api.transport.TransportConstants.MAX_CLOCK_DIFFERENCE; -import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; - -@Deprecated -@Immutable -@NotNullByDefault -public abstract class BdfQueueMessageValidator - implements QueueMessageValidator { - - protected static final Logger LOG = - Logger.getLogger(BdfQueueMessageValidator.class.getName()); - - protected final ClientHelper clientHelper; - protected final MetadataEncoder metadataEncoder; - protected final Clock clock; - - protected BdfQueueMessageValidator(ClientHelper clientHelper, - MetadataEncoder metadataEncoder, Clock clock) { - this.clientHelper = clientHelper; - this.metadataEncoder = metadataEncoder; - this.clock = clock; - } - - protected abstract BdfMessageContext validateMessage(Message m, Group g, - BdfList body) throws InvalidMessageException, FormatException; - - @Override - public MessageContext validateMessage(QueueMessage q, Group g) - throws InvalidMessageException { - // Reject the message if it's too far in the future - long now = clock.currentTimeMillis(); - if (q.getTimestamp() - now > MAX_CLOCK_DIFFERENCE) { - throw new InvalidMessageException( - "Timestamp is too far in the future"); - } - byte[] raw = q.getRaw(); - if (raw.length <= QUEUE_MESSAGE_HEADER_LENGTH) { - throw new InvalidMessageException("Message is too short"); - } - try { - BdfList body = clientHelper.toList(raw, QUEUE_MESSAGE_HEADER_LENGTH, - raw.length - QUEUE_MESSAGE_HEADER_LENGTH); - BdfMessageContext result = validateMessage(q, g, body); - Metadata meta = metadataEncoder.encode(result.getDictionary()); - return new MessageContext(meta, result.getDependencies()); - } catch (FormatException e) { - throw new InvalidMessageException(e); - } - } -} diff --git a/briar-core/src/main/java/org/briarproject/briar/client/BriarClientModule.java b/briar-core/src/main/java/org/briarproject/briar/client/BriarClientModule.java index 46ee505dfa0cf6ad68f119ef8563a024b16f9f42..9eeb6b9597eac924138444067dcddd716384be8c 100644 --- a/briar-core/src/main/java/org/briarproject/briar/client/BriarClientModule.java +++ b/briar-core/src/main/java/org/briarproject/briar/client/BriarClientModule.java @@ -1,14 +1,6 @@ package org.briarproject.briar.client; -import org.briarproject.bramble.api.client.ClientHelper; -import org.briarproject.bramble.api.db.DatabaseComponent; -import org.briarproject.bramble.api.sync.MessageFactory; -import org.briarproject.bramble.api.sync.ValidationManager; -import org.briarproject.briar.api.client.MessageQueueManager; import org.briarproject.briar.api.client.MessageTracker; -import org.briarproject.briar.api.client.QueueMessageFactory; - -import javax.inject.Singleton; import dagger.Module; import dagger.Provides; @@ -16,21 +8,6 @@ import dagger.Provides; @Module public class BriarClientModule { - @Provides - @Singleton - MessageQueueManager provideMessageQueueManager(DatabaseComponent db, - ClientHelper clientHelper, QueueMessageFactory queueMessageFactory, - ValidationManager validationManager) { - return new MessageQueueManagerImpl(db, clientHelper, - queueMessageFactory, validationManager); - } - - @Provides - QueueMessageFactory provideQueueMessageFactory( - MessageFactory messageFactory) { - return new QueueMessageFactoryImpl(messageFactory); - } - @Provides MessageTracker provideMessageTracker(MessageTrackerImpl messageTracker) { return messageTracker; diff --git a/briar-core/src/main/java/org/briarproject/briar/client/MessageQueueManagerImpl.java b/briar-core/src/main/java/org/briarproject/briar/client/MessageQueueManagerImpl.java deleted file mode 100644 index 47c91bbc2d7edc24ae4b9945a17faed057d0ada3..0000000000000000000000000000000000000000 --- a/briar-core/src/main/java/org/briarproject/briar/client/MessageQueueManagerImpl.java +++ /dev/null @@ -1,259 +0,0 @@ -package org.briarproject.briar.client; - -import org.briarproject.bramble.api.FormatException; -import org.briarproject.bramble.api.client.ClientHelper; -import org.briarproject.bramble.api.data.BdfDictionary; -import org.briarproject.bramble.api.data.BdfList; -import org.briarproject.bramble.api.db.DatabaseComponent; -import org.briarproject.bramble.api.db.DbException; -import org.briarproject.bramble.api.db.Metadata; -import org.briarproject.bramble.api.db.Transaction; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.sync.ClientId; -import org.briarproject.bramble.api.sync.Group; -import org.briarproject.bramble.api.sync.GroupId; -import org.briarproject.bramble.api.sync.InvalidMessageException; -import org.briarproject.bramble.api.sync.Message; -import org.briarproject.bramble.api.sync.MessageContext; -import org.briarproject.bramble.api.sync.MessageId; -import org.briarproject.bramble.api.sync.ValidationManager; -import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook; -import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator; -import org.briarproject.bramble.util.ByteUtils; -import org.briarproject.briar.api.client.MessageQueueManager; -import org.briarproject.briar.api.client.QueueMessage; -import org.briarproject.briar.api.client.QueueMessageFactory; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; -import java.util.TreeMap; -import java.util.logging.Logger; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.Immutable; -import javax.inject.Inject; - -import static java.util.logging.Level.INFO; -import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; -import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; - -@Immutable -@NotNullByDefault -class MessageQueueManagerImpl implements MessageQueueManager { - - private static final String OUTGOING_POSITION_KEY = "nextOut"; - private static final String INCOMING_POSITION_KEY = "nextIn"; - private static final String PENDING_MESSAGES_KEY = "pending"; - - private static final Logger LOG = - Logger.getLogger(MessageQueueManagerImpl.class.getName()); - - private final DatabaseComponent db; - private final ClientHelper clientHelper; - private final QueueMessageFactory queueMessageFactory; - private final ValidationManager validationManager; - - @Inject - MessageQueueManagerImpl(DatabaseComponent db, ClientHelper clientHelper, - QueueMessageFactory queueMessageFactory, - ValidationManager validationManager) { - this.db = db; - this.clientHelper = clientHelper; - this.queueMessageFactory = queueMessageFactory; - this.validationManager = validationManager; - } - - @Override - public QueueMessage sendMessage(Transaction txn, Group queue, - long timestamp, byte[] body, Metadata meta) throws DbException { - QueueState queueState = loadQueueState(txn, queue.getId()); - long queuePosition = queueState.outgoingPosition; - queueState.outgoingPosition++; - if (LOG.isLoggable(INFO)) - LOG.info("Sending message with position " + queuePosition); - saveQueueState(txn, queue.getId(), queueState); - QueueMessage q = queueMessageFactory.createMessage(queue.getId(), - timestamp, queuePosition, body); - db.addLocalMessage(txn, q, meta, true); - return q; - } - - @Override - public void registerMessageValidator(ClientId c, QueueMessageValidator v) { - validationManager.registerMessageValidator(c, - new DelegatingMessageValidator(v)); - } - - @Override - public void registerIncomingMessageHook(ClientId c, - IncomingQueueMessageHook hook) { - validationManager.registerIncomingMessageHook(c, - new DelegatingIncomingMessageHook(hook)); - } - - private QueueState loadQueueState(Transaction txn, GroupId g) - throws DbException { - try { - TreeMap<Long, MessageId> pending = new TreeMap<>(); - Metadata groupMeta = db.getGroupMetadata(txn, g); - byte[] raw = groupMeta.get(QUEUE_STATE_KEY); - if (raw == null) return new QueueState(0, 0, pending); - BdfDictionary d = clientHelper.toDictionary(raw, 0, raw.length); - long outgoingPosition = d.getLong(OUTGOING_POSITION_KEY); - long incomingPosition = d.getLong(INCOMING_POSITION_KEY); - BdfList pendingList = d.getList(PENDING_MESSAGES_KEY); - for (int i = 0; i < pendingList.size(); i++) { - BdfList item = pendingList.getList(i); - if (item.size() != 2) throw new FormatException(); - pending.put(item.getLong(0), new MessageId(item.getRaw(1))); - } - return new QueueState(outgoingPosition, incomingPosition, pending); - } catch (FormatException e) { - throw new DbException(e); - } - } - - private void saveQueueState(Transaction txn, GroupId g, - QueueState queueState) throws DbException { - try { - BdfDictionary d = new BdfDictionary(); - d.put(OUTGOING_POSITION_KEY, queueState.outgoingPosition); - d.put(INCOMING_POSITION_KEY, queueState.incomingPosition); - BdfList pendingList = new BdfList(); - for (Entry<Long, MessageId> e : queueState.pending.entrySet()) - pendingList.add(BdfList.of(e.getKey(), e.getValue())); - d.put(PENDING_MESSAGES_KEY, pendingList); - Metadata groupMeta = new Metadata(); - groupMeta.put(QUEUE_STATE_KEY, clientHelper.toByteArray(d)); - db.mergeGroupMetadata(txn, g, groupMeta); - } catch (FormatException e) { - throw new RuntimeException(e); - } - } - - private static class QueueState { - - private long outgoingPosition, incomingPosition; - private final TreeMap<Long, MessageId> pending; - - private QueueState(long outgoingPosition, long incomingPosition, - TreeMap<Long, MessageId> pending) { - this.outgoingPosition = outgoingPosition; - this.incomingPosition = incomingPosition; - this.pending = pending; - } - - @Nullable - MessageId popIncomingMessageId() { - Iterator<Entry<Long, MessageId>> it = pending.entrySet().iterator(); - if (!it.hasNext()) { - LOG.info("No pending messages"); - return null; - } - Entry<Long, MessageId> e = it.next(); - if (!e.getKey().equals(incomingPosition)) { - if (LOG.isLoggable(INFO)) { - LOG.info("First pending message is " + e.getKey() + ", " - + " expecting " + incomingPosition); - } - return null; - } - if (LOG.isLoggable(INFO)) - LOG.info("Removing pending message " + e.getKey()); - it.remove(); - incomingPosition++; - return e.getValue(); - } - } - - @NotNullByDefault - private static class DelegatingMessageValidator - implements MessageValidator { - - private final QueueMessageValidator delegate; - - private DelegatingMessageValidator(QueueMessageValidator delegate) { - this.delegate = delegate; - } - - @Override - public MessageContext validateMessage(Message m, Group g) - throws InvalidMessageException { - byte[] raw = m.getRaw(); - if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH) - throw new InvalidMessageException(); - long queuePosition = ByteUtils.readUint64(raw, - MESSAGE_HEADER_LENGTH); - if (queuePosition < 0) throw new InvalidMessageException(); - QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(), - m.getTimestamp(), queuePosition, raw); - return delegate.validateMessage(q, g); - } - } - - @NotNullByDefault - private class DelegatingIncomingMessageHook implements IncomingMessageHook { - - private final IncomingQueueMessageHook delegate; - - private DelegatingIncomingMessageHook( - IncomingQueueMessageHook delegate) { - this.delegate = delegate; - } - - @Override - public boolean incomingMessage(Transaction txn, Message m, - Metadata meta) throws DbException, InvalidMessageException { - long queuePosition = ByteUtils.readUint64(m.getRaw(), - MESSAGE_HEADER_LENGTH); - QueueState queueState = loadQueueState(txn, m.getGroupId()); - if (LOG.isLoggable(INFO)) { - LOG.info("Received message with position " - + queuePosition + ", expecting " - + queueState.incomingPosition); - } - if (queuePosition < queueState.incomingPosition) { - // A message with this queue position has already been seen - LOG.warning("Deleting message with duplicate position"); - db.deleteMessage(txn, m.getId()); - db.deleteMessageMetadata(txn, m.getId()); - } else if (queuePosition > queueState.incomingPosition) { - // The message is out of order, add it to the pending list - LOG.info("Message is out of order, adding to pending list"); - queueState.pending.put(queuePosition, m.getId()); - saveQueueState(txn, m.getGroupId(), queueState); - } else { - // The message is in order - LOG.info("Message is in order, delivering"); - QueueMessage q = new QueueMessage(m.getId(), m.getGroupId(), - m.getTimestamp(), queuePosition, m.getRaw()); - queueState.incomingPosition++; - // Collect any consecutive messages - List<MessageId> consecutive = new ArrayList<>(); - MessageId next; - while ((next = queueState.popIncomingMessageId()) != null) - consecutive.add(next); - // Save the queue state before passing control to the delegate - saveQueueState(txn, m.getGroupId(), queueState); - // Deliver the messages to the delegate - delegate.incomingMessage(txn, q, meta); - for (MessageId id : consecutive) { - byte[] raw = db.getRawMessage(txn, id); - if (raw == null) throw new DbException(); - meta = db.getMessageMetadata(txn, id); - q = queueMessageFactory.createMessage(id, raw); - if (LOG.isLoggable(INFO)) { - LOG.info("Delivering pending message with position " - + q.getQueuePosition()); - } - delegate.incomingMessage(txn, q, meta); - } - } - // message queues are only useful for groups with two members - // so messages don't need to be shared - return false; - } - } -} diff --git a/briar-core/src/main/java/org/briarproject/briar/client/QueueMessageFactoryImpl.java b/briar-core/src/main/java/org/briarproject/briar/client/QueueMessageFactoryImpl.java deleted file mode 100644 index 480b7670b8a6d7b56d715a0ef144dea483488621..0000000000000000000000000000000000000000 --- a/briar-core/src/main/java/org/briarproject/briar/client/QueueMessageFactoryImpl.java +++ /dev/null @@ -1,60 +0,0 @@ -package org.briarproject.briar.client; - -import org.briarproject.bramble.api.UniqueId; -import org.briarproject.bramble.api.nullsafety.NotNullByDefault; -import org.briarproject.bramble.api.sync.GroupId; -import org.briarproject.bramble.api.sync.Message; -import org.briarproject.bramble.api.sync.MessageFactory; -import org.briarproject.bramble.api.sync.MessageId; -import org.briarproject.bramble.util.ByteUtils; -import org.briarproject.briar.api.client.QueueMessage; -import org.briarproject.briar.api.client.QueueMessageFactory; - -import javax.annotation.concurrent.Immutable; -import javax.inject.Inject; - -import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_LENGTH; -import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; -import static org.briarproject.bramble.util.ByteUtils.INT_64_BYTES; -import static org.briarproject.briar.api.client.QueueMessage.MAX_QUEUE_MESSAGE_BODY_LENGTH; -import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; - -@Immutable -@NotNullByDefault -class QueueMessageFactoryImpl implements QueueMessageFactory { - - private final MessageFactory messageFactory; - - @Inject - QueueMessageFactoryImpl(MessageFactory messageFactory) { - this.messageFactory = messageFactory; - } - - @Override - public QueueMessage createMessage(GroupId groupId, long timestamp, - long queuePosition, byte[] body) { - if (body.length > MAX_QUEUE_MESSAGE_BODY_LENGTH) - throw new IllegalArgumentException(); - byte[] messageBody = new byte[INT_64_BYTES + body.length]; - ByteUtils.writeUint64(queuePosition, messageBody, 0); - System.arraycopy(body, 0, messageBody, INT_64_BYTES, body.length); - Message m = messageFactory.createMessage(groupId, timestamp, - messageBody); - return new QueueMessage(m.getId(), groupId, timestamp, queuePosition, - m.getRaw()); - } - - @Override - public QueueMessage createMessage(MessageId id, byte[] raw) { - if (raw.length < QUEUE_MESSAGE_HEADER_LENGTH) - throw new IllegalArgumentException(); - if (raw.length > MAX_MESSAGE_LENGTH) - throw new IllegalArgumentException(); - byte[] groupId = new byte[UniqueId.LENGTH]; - System.arraycopy(raw, 0, groupId, 0, UniqueId.LENGTH); - long timestamp = ByteUtils.readUint64(raw, UniqueId.LENGTH); - long queuePosition = ByteUtils.readUint64(raw, MESSAGE_HEADER_LENGTH); - return new QueueMessage(id, new GroupId(groupId), timestamp, - queuePosition, raw); - } -} diff --git a/briar-core/src/test/java/org/briarproject/briar/client/MessageQueueManagerImplTest.java b/briar-core/src/test/java/org/briarproject/briar/client/MessageQueueManagerImplTest.java deleted file mode 100644 index 0a11b0fea447cbb09a53982ae98db25d70c047ba..0000000000000000000000000000000000000000 --- a/briar-core/src/test/java/org/briarproject/briar/client/MessageQueueManagerImplTest.java +++ /dev/null @@ -1,566 +0,0 @@ -package org.briarproject.briar.client; - -import org.briarproject.bramble.api.client.ClientHelper; -import org.briarproject.bramble.api.data.BdfDictionary; -import org.briarproject.bramble.api.data.BdfList; -import org.briarproject.bramble.api.db.DatabaseComponent; -import org.briarproject.bramble.api.db.Metadata; -import org.briarproject.bramble.api.db.Transaction; -import org.briarproject.bramble.api.sync.ClientId; -import org.briarproject.bramble.api.sync.Group; -import org.briarproject.bramble.api.sync.GroupId; -import org.briarproject.bramble.api.sync.InvalidMessageException; -import org.briarproject.bramble.api.sync.Message; -import org.briarproject.bramble.api.sync.MessageContext; -import org.briarproject.bramble.api.sync.MessageId; -import org.briarproject.bramble.api.sync.ValidationManager; -import org.briarproject.bramble.api.sync.ValidationManager.IncomingMessageHook; -import org.briarproject.bramble.api.sync.ValidationManager.MessageValidator; -import org.briarproject.bramble.test.CaptureArgumentAction; -import org.briarproject.bramble.test.TestUtils; -import org.briarproject.bramble.util.ByteUtils; -import org.briarproject.briar.api.client.MessageQueueManager.IncomingQueueMessageHook; -import org.briarproject.briar.api.client.MessageQueueManager.QueueMessageValidator; -import org.briarproject.briar.api.client.QueueMessage; -import org.briarproject.briar.api.client.QueueMessageFactory; -import org.briarproject.briar.test.BriarTestCase; -import org.hamcrest.Description; -import org.jmock.Expectations; -import org.jmock.Mockery; -import org.jmock.api.Action; -import org.jmock.api.Invocation; -import org.junit.Test; - -import java.util.concurrent.atomic.AtomicReference; - -import static org.briarproject.bramble.api.sync.SyncConstants.MESSAGE_HEADER_LENGTH; -import static org.briarproject.bramble.test.TestUtils.getClientId; -import static org.briarproject.bramble.test.TestUtils.getGroup; -import static org.briarproject.briar.api.client.MessageQueueManager.QUEUE_STATE_KEY; -import static org.briarproject.briar.api.client.QueueMessage.QUEUE_MESSAGE_HEADER_LENGTH; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - -public class MessageQueueManagerImplTest extends BriarTestCase { - - private final ClientId clientId = getClientId(); - private final Group group = getGroup(clientId); - private final GroupId groupId = group.getId(); - private final long timestamp = System.currentTimeMillis(); - - @Test - public void testSendingMessages() throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - - Transaction txn = new Transaction(null, false); - byte[] body = new byte[123]; - Metadata groupMetadata = new Metadata(); - Metadata messageMetadata = new Metadata(); - Metadata groupMetadata1 = new Metadata(); - byte[] queueState = new byte[123]; - groupMetadata1.put(QUEUE_STATE_KEY, queueState); - - context.checking(new Expectations() {{ - // First message: queue state does not exist - oneOf(db).getGroupMetadata(txn, groupId); - will(returnValue(groupMetadata)); - oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); - will(new EncodeQueueStateAction(1L, 0L, new BdfList())); - oneOf(db).mergeGroupMetadata(with(txn), with(groupId), - with(any(Metadata.class))); - oneOf(queueMessageFactory).createMessage(groupId, timestamp, 0L, - body); - will(new CreateMessageAction()); - oneOf(db).addLocalMessage(with(txn), with(any(QueueMessage.class)), - with(messageMetadata), with(true)); - // Second message: queue state exists - oneOf(db).getGroupMetadata(txn, groupId); - will(returnValue(groupMetadata1)); - oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); - will(new DecodeQueueStateAction(1L, 0L, new BdfList())); - oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); - will(new EncodeQueueStateAction(2L, 0L, new BdfList())); - oneOf(db).mergeGroupMetadata(with(txn), with(groupId), - with(any(Metadata.class))); - oneOf(queueMessageFactory).createMessage(groupId, timestamp, 1L, - body); - will(new CreateMessageAction()); - oneOf(db).addLocalMessage(with(txn), with(any(QueueMessage.class)), - with(messageMetadata), with(true)); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // First message - QueueMessage q = mqm.sendMessage(txn, group, timestamp, body, - messageMetadata); - assertEquals(groupId, q.getGroupId()); - assertEquals(timestamp, q.getTimestamp()); - assertEquals(0L, q.getQueuePosition()); - assertEquals(QUEUE_MESSAGE_HEADER_LENGTH + body.length, q.getLength()); - - // Second message - QueueMessage q1 = mqm.sendMessage(txn, group, timestamp, body, - messageMetadata); - assertEquals(groupId, q1.getGroupId()); - assertEquals(timestamp, q1.getTimestamp()); - assertEquals(1L, q1.getQueuePosition()); - assertEquals(QUEUE_MESSAGE_HEADER_LENGTH + body.length, q1.getLength()); - - context.assertIsSatisfied(); - } - - @Test - public void testValidatorRejectsShortMessage() throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - - AtomicReference<MessageValidator> captured = new AtomicReference<>(); - QueueMessageValidator queueMessageValidator = - context.mock(QueueMessageValidator.class); - // The message is too short to be a valid queue message - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH - 1]; - Message message = new Message(messageId, groupId, timestamp, raw); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerMessageValidator(with(clientId), - with(any(MessageValidator.class))); - will(new CaptureArgumentAction<>(captured, - MessageValidator.class, 1)); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating message validator - mqm.registerMessageValidator(clientId, queueMessageValidator); - MessageValidator delegate = captured.get(); - assertNotNull(delegate); - // The message should be invalid - try { - delegate.validateMessage(message, group); - fail(); - } catch (InvalidMessageException expected) { - // Expected - } - - context.assertIsSatisfied(); - } - - @Test - public void testValidatorRejectsNegativeQueuePosition() throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - - AtomicReference<MessageValidator> captured = new AtomicReference<>(); - QueueMessageValidator queueMessageValidator = - context.mock(QueueMessageValidator.class); - // The message has a negative queue position - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - for (int i = 0; i < 8; i++) - raw[MESSAGE_HEADER_LENGTH + i] = (byte) 0xFF; - Message message = new Message(messageId, groupId, timestamp, raw); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerMessageValidator(with(clientId), - with(any(MessageValidator.class))); - will(new CaptureArgumentAction<>(captured, - MessageValidator.class, 1)); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating message validator - mqm.registerMessageValidator(clientId, queueMessageValidator); - MessageValidator delegate = captured.get(); - assertNotNull(delegate); - // The message should be invalid - try { - delegate.validateMessage(message, group); - fail(); - } catch (InvalidMessageException expected) { - // Expected - } - - context.assertIsSatisfied(); - } - - @Test - public void testValidatorDelegatesValidMessage() throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - - AtomicReference<MessageValidator> captured = new AtomicReference<>(); - QueueMessageValidator queueMessageValidator = - context.mock(QueueMessageValidator.class); - Metadata metadata = new Metadata(); - MessageContext messageContext = - new MessageContext(metadata); - // The message is valid, with a queue position of zero - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - Message message = new Message(messageId, groupId, timestamp, raw); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerMessageValidator(with(clientId), - with(any(MessageValidator.class))); - will(new CaptureArgumentAction<>(captured, - MessageValidator.class, 1)); - // The message should be delegated - oneOf(queueMessageValidator).validateMessage( - with(any(QueueMessage.class)), with(group)); - will(returnValue(messageContext)); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating message validator - mqm.registerMessageValidator(clientId, queueMessageValidator); - MessageValidator delegate = captured.get(); - assertNotNull(delegate); - // The message should be valid and the metadata should be returned - assertSame(messageContext, delegate.validateMessage(message, group)); - assertSame(metadata, messageContext.getMetadata()); - - context.assertIsSatisfied(); - } - - @Test - public void testIncomingMessageHookDeletesDuplicateMessage() - throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - AtomicReference<IncomingMessageHook> captured = new AtomicReference<>(); - IncomingQueueMessageHook incomingQueueMessageHook = - context.mock(IncomingQueueMessageHook.class); - - Transaction txn = new Transaction(null, false); - Metadata groupMetadata = new Metadata(); - byte[] queueState = new byte[123]; - groupMetadata.put(QUEUE_STATE_KEY, queueState); - // The message has queue position 0 - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - Message message = new Message(messageId, groupId, timestamp, raw); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerIncomingMessageHook(with(clientId), - with(any(IncomingMessageHook.class))); - will(new CaptureArgumentAction<>(captured, - IncomingMessageHook.class, 1)); - oneOf(db).getGroupMetadata(txn, groupId); - will(returnValue(groupMetadata)); - // Queue position 1 is expected - oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); - will(new DecodeQueueStateAction(0L, 1L, new BdfList())); - // The message and its metadata should be deleted - oneOf(db).deleteMessage(txn, messageId); - oneOf(db).deleteMessageMetadata(txn, messageId); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating incoming message hook - mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook); - IncomingMessageHook delegate = captured.get(); - assertNotNull(delegate); - // Pass the message to the hook - delegate.incomingMessage(txn, message, new Metadata()); - - context.assertIsSatisfied(); - } - - @Test - public void testIncomingMessageHookAddsOutOfOrderMessageToPendingList() - throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - AtomicReference<IncomingMessageHook> captured = new AtomicReference<>(); - IncomingQueueMessageHook incomingQueueMessageHook = - context.mock(IncomingQueueMessageHook.class); - - Transaction txn = new Transaction(null, false); - Metadata groupMetadata = new Metadata(); - byte[] queueState = new byte[123]; - groupMetadata.put(QUEUE_STATE_KEY, queueState); - // The message has queue position 1 - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - ByteUtils.writeUint64(1L, raw, MESSAGE_HEADER_LENGTH); - Message message = new Message(messageId, groupId, timestamp, raw); - BdfList pending = BdfList.of(BdfList.of(1L, messageId)); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerIncomingMessageHook(with(clientId), - with(any(IncomingMessageHook.class))); - will(new CaptureArgumentAction<>(captured, - IncomingMessageHook.class, 1)); - oneOf(db).getGroupMetadata(txn, groupId); - will(returnValue(groupMetadata)); - // Queue position 0 is expected - oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); - will(new DecodeQueueStateAction(0L, 0L, new BdfList())); - // The message should be added to the pending list - oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); - will(new EncodeQueueStateAction(0L, 0L, pending)); - oneOf(db).mergeGroupMetadata(with(txn), with(groupId), - with(any(Metadata.class))); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating incoming message hook - mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook); - IncomingMessageHook delegate = captured.get(); - assertNotNull(delegate); - // Pass the message to the hook - delegate.incomingMessage(txn, message, new Metadata()); - - context.assertIsSatisfied(); - } - - @Test - public void testIncomingMessageHookDelegatesInOrderMessage() - throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - AtomicReference<IncomingMessageHook> captured = new AtomicReference<>(); - IncomingQueueMessageHook incomingQueueMessageHook = - context.mock(IncomingQueueMessageHook.class); - - Transaction txn = new Transaction(null, false); - Metadata groupMetadata = new Metadata(); - byte[] queueState = new byte[123]; - groupMetadata.put(QUEUE_STATE_KEY, queueState); - // The message has queue position 0 - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - Message message = new Message(messageId, groupId, timestamp, raw); - Metadata messageMetadata = new Metadata(); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerIncomingMessageHook(with(clientId), - with(any(IncomingMessageHook.class))); - will(new CaptureArgumentAction<>(captured, - IncomingMessageHook.class, 1)); - oneOf(db).getGroupMetadata(txn, groupId); - will(returnValue(groupMetadata)); - // Queue position 0 is expected - oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); - will(new DecodeQueueStateAction(0L, 0L, new BdfList())); - // Queue position 1 should be expected next - oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); - will(new EncodeQueueStateAction(0L, 1L, new BdfList())); - oneOf(db).mergeGroupMetadata(with(txn), with(groupId), - with(any(Metadata.class))); - // The message should be delegated - oneOf(incomingQueueMessageHook).incomingMessage(with(txn), - with(any(QueueMessage.class)), with(messageMetadata)); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating incoming message hook - mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook); - IncomingMessageHook delegate = captured.get(); - assertNotNull(delegate); - // Pass the message to the hook - delegate.incomingMessage(txn, message, messageMetadata); - - context.assertIsSatisfied(); - } - - @Test - public void testIncomingMessageHookRetrievesPendingMessage() - throws Exception { - Mockery context = new Mockery(); - DatabaseComponent db = context.mock(DatabaseComponent.class); - ClientHelper clientHelper = context.mock(ClientHelper.class); - QueueMessageFactory queueMessageFactory = - context.mock(QueueMessageFactory.class); - ValidationManager validationManager = - context.mock(ValidationManager.class); - AtomicReference<IncomingMessageHook> captured = new AtomicReference<>(); - IncomingQueueMessageHook incomingQueueMessageHook = - context.mock(IncomingQueueMessageHook.class); - - Transaction txn = new Transaction(null, false); - Metadata groupMetadata = new Metadata(); - byte[] queueState = new byte[123]; - groupMetadata.put(QUEUE_STATE_KEY, queueState); - // The message has queue position 0 - MessageId messageId = new MessageId(TestUtils.getRandomId()); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - Message message = new Message(messageId, groupId, timestamp, raw); - Metadata messageMetadata = new Metadata(); - // Queue position 1 is pending - MessageId messageId1 = new MessageId(TestUtils.getRandomId()); - byte[] raw1 = new byte[QUEUE_MESSAGE_HEADER_LENGTH]; - QueueMessage message1 = new QueueMessage(messageId1, groupId, - timestamp, 1L, raw1); - Metadata messageMetadata1 = new Metadata(); - BdfList pending = BdfList.of(BdfList.of(1L, messageId1)); - - context.checking(new Expectations() {{ - oneOf(validationManager).registerIncomingMessageHook(with(clientId), - with(any(IncomingMessageHook.class))); - will(new CaptureArgumentAction<>(captured, - IncomingMessageHook.class, 1)); - oneOf(db).getGroupMetadata(txn, groupId); - will(returnValue(groupMetadata)); - // Queue position 0 is expected, position 1 is pending - oneOf(clientHelper).toDictionary(queueState, 0, queueState.length); - will(new DecodeQueueStateAction(0L, 0L, pending)); - // Queue position 2 should be expected next - oneOf(clientHelper).toByteArray(with(any(BdfDictionary.class))); - will(new EncodeQueueStateAction(0L, 2L, new BdfList())); - oneOf(db).mergeGroupMetadata(with(txn), with(groupId), - with(any(Metadata.class))); - // The new message should be delegated - oneOf(incomingQueueMessageHook).incomingMessage(with(txn), - with(any(QueueMessage.class)), with(messageMetadata)); - // The pending message should be retrieved - oneOf(db).getRawMessage(txn, messageId1); - will(returnValue(raw1)); - oneOf(db).getMessageMetadata(txn, messageId1); - will(returnValue(messageMetadata1)); - oneOf(queueMessageFactory).createMessage(messageId1, raw1); - will(returnValue(message1)); - // The pending message should be delegated - oneOf(incomingQueueMessageHook).incomingMessage(txn, message1, - messageMetadata1); - }}); - - MessageQueueManagerImpl mqm = new MessageQueueManagerImpl(db, - clientHelper, queueMessageFactory, validationManager); - - // Capture the delegating incoming message hook - mqm.registerIncomingMessageHook(clientId, incomingQueueMessageHook); - IncomingMessageHook delegate = captured.get(); - assertNotNull(delegate); - // Pass the message to the hook - delegate.incomingMessage(txn, message, messageMetadata); - - context.assertIsSatisfied(); - } - - private class EncodeQueueStateAction implements Action { - - private final long outgoingPosition, incomingPosition; - private final BdfList pending; - - private EncodeQueueStateAction(long outgoingPosition, - long incomingPosition, BdfList pending) { - this.outgoingPosition = outgoingPosition; - this.incomingPosition = incomingPosition; - this.pending = pending; - } - - @Override - public Object invoke(Invocation invocation) throws Throwable { - BdfDictionary d = (BdfDictionary) invocation.getParameter(0); - assertEquals(outgoingPosition, d.getLong("nextOut").longValue()); - assertEquals(incomingPosition, d.getLong("nextIn").longValue()); - assertEquals(pending, d.getList("pending")); - return new byte[123]; - } - - @Override - public void describeTo(Description description) { - description.appendText("encodes a queue state"); - } - } - - private class DecodeQueueStateAction implements Action { - - private final long outgoingPosition, incomingPosition; - private final BdfList pending; - - private DecodeQueueStateAction(long outgoingPosition, - long incomingPosition, BdfList pending) { - this.outgoingPosition = outgoingPosition; - this.incomingPosition = incomingPosition; - this.pending = pending; - } - - @Override - public Object invoke(Invocation invocation) throws Throwable { - BdfDictionary d = new BdfDictionary(); - d.put("nextOut", outgoingPosition); - d.put("nextIn", incomingPosition); - d.put("pending", pending); - return d; - } - - @Override - public void describeTo(Description description) { - description.appendText("decodes a queue state"); - } - } - - private class CreateMessageAction implements Action { - - @Override - public Object invoke(Invocation invocation) throws Throwable { - GroupId groupId = (GroupId) invocation.getParameter(0); - long timestamp = (Long) invocation.getParameter(1); - long queuePosition = (Long) invocation.getParameter(2); - byte[] body = (byte[]) invocation.getParameter(3); - byte[] raw = new byte[QUEUE_MESSAGE_HEADER_LENGTH + body.length]; - MessageId id = new MessageId(TestUtils.getRandomId()); - return new QueueMessage(id, groupId, timestamp, queuePosition, raw); - } - - @Override - public void describeTo(Description description) { - description.appendText("creates a message"); - } - } - -}