diff --git a/briar-api/src/net/sf/briar/api/db/AckAndRequest.java b/briar-api/src/net/sf/briar/api/db/AckAndRequest.java deleted file mode 100644 index 11959a17d90a0c4a7762b757b819109584d02f66..0000000000000000000000000000000000000000 --- a/briar-api/src/net/sf/briar/api/db/AckAndRequest.java +++ /dev/null @@ -1,27 +0,0 @@ -package net.sf.briar.api.db; - -import net.sf.briar.api.messaging.Ack; -import net.sf.briar.api.messaging.Request; - -/** - * A tuple of an {@link net.sf.briar.api.messaging.Ack} and a - * {@link net.sf.briar.api.messaging.Request}. - */ -public class AckAndRequest { - - private final Ack ack; - private final Request request; - - public AckAndRequest(Ack ack, Request request) { - this.ack = ack; - this.request = request; - } - - public Ack getAck() { - return ack; - } - - public Request getRequest() { - return request; - } -} diff --git a/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java b/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java index 673b33430d2d39e4e72b4e91fe329bc1a79d74a1..4d3ab1808037f10a1e0c0efb6c8db2110ddf7084 100644 --- a/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java +++ b/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java @@ -20,6 +20,7 @@ import net.sf.briar.api.messaging.GroupStatus; import net.sf.briar.api.messaging.Message; import net.sf.briar.api.messaging.MessageId; import net.sf.briar.api.messaging.Offer; +import net.sf.briar.api.messaging.Request; import net.sf.briar.api.messaging.RetentionAck; import net.sf.briar.api.messaging.RetentionUpdate; import net.sf.briar.api.messaging.SubscriptionAck; @@ -80,9 +81,6 @@ public interface DatabaseComponent { */ boolean addTransport(TransportId t, long maxLatency) throws DbException; - /** Returns true if any messages are sendable to the given contact. */ - boolean containsSendableMessages(ContactId c) throws DbException; - /** * Returns an acknowledgement for the given contact, or null if there are * no messages to acknowledge. @@ -99,23 +97,28 @@ public interface DatabaseComponent { long maxLatency) throws DbException; /** - * Returns a batch of raw messages for the given contact from the given - * collection of requested messages, with a total length less than or equal - * to the given length, for transmission over a transport with the given - * maximum latency. Any messages that were either added to the batch, or - * were considered but are not sendable to the contact, are removed from - * the collection of requested messages before returning. Returns null if - * there are no sendable messages that fit in the given length. + * Returns an offer for the given contact for transmission over a + * transport with the given maximum latency, or null if there are no + * messages to offer. */ - Collection<byte[]> generateBatch(ContactId c, int maxLength, - long maxLatency, Collection<MessageId> requested) - throws DbException; + Offer generateOffer(ContactId c, int maxMessages, long maxLatency) + throws DbException; /** - * Returns an offer for the given contact, or null if there are no messages - * to offer. + * Returns a request for the given contact, or null if there are no + * messages to request. + */ + Request generateRequest(ContactId c, int maxMessages) throws DbException; + + /** + * Returns a batch of raw messages for the given contact, with a total + * length less than or equal to the given length, for transmission over a + * transport with the given maximum latency. Only messages that have been + * requested by the contact are returned. Returns null if there are no + * sendable messages that fit in the given length. */ - Offer generateOffer(ContactId c, int maxMessages) throws DbException; + Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength, + long maxLatency) throws DbException; /** * Returns a retention ack for the given contact, or null if no retention @@ -220,7 +223,7 @@ public interface DatabaseComponent { Collection<MessageHeader> getMessageHeaders(GroupId g) throws DbException; - /** Returns true if the given message has been read. */ + /** Returns true if the given message is marked as read. */ boolean getReadFlag(MessageId m) throws DbException; /** Returns all remote transport properties for the given transport. */ @@ -266,18 +269,11 @@ public interface DatabaseComponent { /** Processes a message from the given contact. */ void receiveMessage(ContactId c, Message m) throws DbException; - /** - * Processes an offer from the given contact and generates an ack for any - * messages in the offer that are present in the database, and a request - * for any messages that are not. The ack or the request may be null if no - * messages meet the respective criteria. - * <p> - * To prevent contacts from using offers to test for subscriptions that are - * not visible to them, any messages belonging to groups that are not - * visible to the contact are requested just as though they were not - * present in the database. - */ - AckAndRequest receiveOffer(ContactId c, Offer o) throws DbException; + /** Processes an offer from the given contact. */ + void receiveOffer(ContactId c, Offer o) throws DbException; + + /** Processes a request from the given contact. */ + void receiveRequest(ContactId c, Request r) throws DbException; /** Processes a retention ack from the given contact. */ void receiveRetentionAck(ContactId c, RetentionAck a) throws DbException; @@ -335,10 +331,9 @@ public interface DatabaseComponent { public void setInboxGroup(ContactId c, Group g) throws DbException; /** - * Marks a message read or unread and returns true if it was previously - * read. + * Marks a message as read or unread. */ - boolean setReadFlag(MessageId m, boolean read) throws DbException; + void setReadFlag(MessageId m, boolean read) throws DbException; /** * Sets the remote transport properties for the given contact, replacing @@ -347,9 +342,6 @@ public interface DatabaseComponent { void setRemoteProperties(ContactId c, Map<TransportId, TransportProperties> p) throws DbException; - /** Records the given messages as having been seen by the given contact. */ - void setSeen(ContactId c, Collection<MessageId> seen) throws DbException; - /** * Makes a group visible to the given set of contacts and invisible to any * other current or future contacts. diff --git a/briar-api/src/net/sf/briar/api/db/event/MessageReceivedEvent.java b/briar-api/src/net/sf/briar/api/db/event/MessageRequestedEvent.java similarity index 52% rename from briar-api/src/net/sf/briar/api/db/event/MessageReceivedEvent.java rename to briar-api/src/net/sf/briar/api/db/event/MessageRequestedEvent.java index 2d338abf962ff59badd5b88b17bd0b801769a72c..f735994036e86218c02f7e4888356be0a666988f 100644 --- a/briar-api/src/net/sf/briar/api/db/event/MessageReceivedEvent.java +++ b/briar-api/src/net/sf/briar/api/db/event/MessageRequestedEvent.java @@ -2,12 +2,12 @@ package net.sf.briar.api.db.event; import net.sf.briar.api.ContactId; -/** An event that is broadcast when a message is received. */ -public class MessageReceivedEvent extends DatabaseEvent { +/** An event that is broadcast when a message is requested by a contact. */ +public class MessageRequestedEvent extends DatabaseEvent { private final ContactId contactId; - public MessageReceivedEvent(ContactId contactId) { + public MessageRequestedEvent(ContactId contactId) { this.contactId = contactId; } diff --git a/briar-api/src/net/sf/briar/api/db/event/MessageToAckEvent.java b/briar-api/src/net/sf/briar/api/db/event/MessageToAckEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..6050acd881839602f2d0d99f6e11d89634350bb2 --- /dev/null +++ b/briar-api/src/net/sf/briar/api/db/event/MessageToAckEvent.java @@ -0,0 +1,20 @@ +package net.sf.briar.api.db.event; + +import net.sf.briar.api.ContactId; + +/** + * An event that is broadcast when a message is received or offered from a + * contact and needs to be acknowledged. + */ +public class MessageToAckEvent extends DatabaseEvent { + + private final ContactId contactId; + + public MessageToAckEvent(ContactId contactId) { + this.contactId = contactId; + } + + public ContactId getContactId() { + return contactId; + } +} diff --git a/briar-api/src/net/sf/briar/api/db/event/MessageToRequestEvent.java b/briar-api/src/net/sf/briar/api/db/event/MessageToRequestEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..196ff5b714b8f1082f0d853dbc7fd42ec0b6e3f6 --- /dev/null +++ b/briar-api/src/net/sf/briar/api/db/event/MessageToRequestEvent.java @@ -0,0 +1,20 @@ +package net.sf.briar.api.db.event; + +import net.sf.briar.api.ContactId; + +/** + * An event that is broadcast when a message is offered by a contact and needs + * to be requested. + */ +public class MessageToRequestEvent extends DatabaseEvent { + + private final ContactId contactId; + + public MessageToRequestEvent(ContactId contactId) { + this.contactId = contactId; + } + + public ContactId getContactId() { + return contactId; + } +} diff --git a/briar-api/src/net/sf/briar/api/messaging/MessagingConstants.java b/briar-api/src/net/sf/briar/api/messaging/MessagingConstants.java index 3b1347f91c9bb8f123c695b7efa84e7806d6bb86..011ef86eddb7d63423e2dba6f6d06e985a71c4a7 100644 --- a/briar-api/src/net/sf/briar/api/messaging/MessagingConstants.java +++ b/briar-api/src/net/sf/briar/api/messaging/MessagingConstants.java @@ -37,8 +37,9 @@ public interface MessagingConstants { int MESSAGE_SALT_LENGTH = 32; /** - * The timestamp of the oldest message in the database is rounded using - * this modulus to avoid revealing the presence of any particular message. + * When calculating the retention time of the database, the timestamp of + * the oldest message in the database is rounded down to a multiple of + * this value to avoid revealing the presence of any particular message. */ - int RETENTION_MODULUS = 60 * 60 * 1000; // 1 hour + int RETENTION_GRANULARITY = 60 * 1000; // 1 minute } diff --git a/briar-api/src/net/sf/briar/api/messaging/PacketWriter.java b/briar-api/src/net/sf/briar/api/messaging/PacketWriter.java index 94b6a44bb5e36089ef903361d486b67535f93b3c..f6a77762c0331c78a3ba18858c2b75d7f90b6760 100644 --- a/briar-api/src/net/sf/briar/api/messaging/PacketWriter.java +++ b/briar-api/src/net/sf/briar/api/messaging/PacketWriter.java @@ -4,7 +4,7 @@ import java.io.IOException; public interface PacketWriter { - int getMaxMessagesForAck(long capacity); + int getMaxMessagesForRequest(long capacity); int getMaxMessagesForOffer(long capacity); diff --git a/briar-core/src/net/sf/briar/db/Database.java b/briar-core/src/net/sf/briar/db/Database.java index 9fc08725435fb711fadbdd298e838ce390eba308..fa6226172873d9530ce8ecc73482390410cd0b27 100644 --- a/briar-core/src/net/sf/briar/db/Database.java +++ b/briar-core/src/net/sf/briar/db/Database.java @@ -118,11 +118,11 @@ interface Database<T> { void addMessage(T txn, Message m, boolean incoming) throws DbException; /** - * Records a received message as needing to be acknowledged. + * Records that a message has been offered by the given contact. * <p> * Locking: message write. */ - void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException; + void addOfferedMessage(T txn, ContactId c, MessageId m) throws DbException; /** * Stores the given temporary secrets and deletes any secrets that have @@ -134,12 +134,14 @@ interface Database<T> { throws DbException; /** - * Initialises the status (seen or unseen) of the given message with - * respect to the given contact. + * Initialises the status of the given message with respect to the given + * contact. + * @param ack whether the message needs to be acknowledged. + * @param seen whether the contact has seen the message. * <p> * Locking: message write. */ - void addStatus(T txn, ContactId c, MessageId m, boolean seen) + void addStatus(T txn, ContactId c, MessageId m, boolean ack, boolean seen) throws DbException; /** @@ -193,13 +195,6 @@ interface Database<T> { */ boolean containsMessage(T txn, MessageId m) throws DbException; - /** - * Returns true if any messages are sendable to the given contact. - * <p> - * Locking: message read, subscription read. - */ - boolean containsSendableMessages(T txn, ContactId c) throws DbException; - /** * Returns true if the database contains the given transport. * <p> @@ -216,6 +211,15 @@ interface Database<T> { boolean containsVisibleGroup(T txn, ContactId c, GroupId g) throws DbException; + /** + * Returns true if the database contains the given message and the message + * is visible to the given contact. + * <p> + * Locking: message read, subscription read. + */ + boolean containsVisibleMessage(T txn, ContactId c, MessageId m) + throws DbException; + /** * Returns the status of all groups to which the user subscribes or can * subscribe, excluding inbox groups. @@ -358,8 +362,8 @@ interface Database<T> { throws DbException; /** - * Returns the IDs of messages received from the given contact that need - * to be acknowledged, up to the given number of messages. + * Returns the IDs of some messages received from the given contact that + * need to be acknowledged, up to the given number of messages. * <p> * Locking: message read. */ @@ -367,7 +371,7 @@ interface Database<T> { throws DbException; /** - * Returns the IDs of some messages that are eligible to be sent to the + * Returns the IDs of some messages that are eligible to be offered to the * given contact, up to the given number of messages. * <p> * Locking: message read, subscription read. @@ -375,6 +379,24 @@ interface Database<T> { Collection<MessageId> getMessagesToOffer(T txn, ContactId c, int maxMessages) throws DbException; + /** + * Returns the IDs of some messages that are eligible to be sent to the + * given contact, up to the given total length. + * <p> + * Locking: message read, subscription read. + */ + Collection<MessageId> getMessagesToSend(T txn, ContactId c, int maxLength) + throws DbException; + + /** + * Returns the IDs of some messages that are eligible to be requested from + * the given contact, up to the given number of messages. + * <p> + * Locking: message read. + */ + Collection<MessageId> getMessagesToRequest(T txn, ContactId c, + int maxMessages) throws DbException; + /** * Returns the IDs of the oldest messages in the database, with a total * size less than or equal to the given size. @@ -400,17 +422,7 @@ interface Database<T> { byte[] getRawMessage(T txn, MessageId m) throws DbException; /** - * Returns the message identified by the given ID, in serialised form. - * Returns null if the message is not present in the database or is not - * sendable to the given contact. - * <p> - * Locking: message read, subscription read. - */ - byte[] getRawMessageIfSendable(T txn, ContactId c, MessageId m) - throws DbException; - - /** - * Returns true if the given message has been read. + * Returns true if the given message is marked as read. * <p> * Locking: message read. */ @@ -424,6 +436,16 @@ interface Database<T> { Map<ContactId, TransportProperties> getRemoteProperties(T txn, TransportId t) throws DbException; + /** + * Returns the IDs of some messages that are eligible to be sent to the + * given contact and have been requested by the contact, up to the given + * total length. + * <p> + * Locking: message read, subscription read. + */ + Collection<MessageId> getRequestedMessagesToSend(T txn, ContactId c, + int maxLength) throws DbException; + /** * Returns a retention ack for the given contact, or null if no ack is due. * <p> @@ -433,7 +455,7 @@ interface Database<T> { /** * Returns a retention update for the given contact and updates its expiry - * time using the given latency. Returns null if no update is due. + * time using the given latency, or returns null if no update is due. * <p> * Locking: message read, retention write. */ @@ -447,16 +469,6 @@ interface Database<T> { */ Collection<TemporarySecret> getSecrets(T txn) throws DbException; - /** - * Returns the IDs of some messages that are eligible to be sent to the - * given contact, with a total length less than or equal to the given - * length. - * <p> - * Locking: message read, subscription read. - */ - Collection<MessageId> getSendableMessages(T txn, ContactId c, int maxLength) - throws DbException; - /** * Returns a subscription ack for the given contact, or null if no ack is * due. @@ -467,22 +479,13 @@ interface Database<T> { /** * Returns a subscription update for the given contact and updates its - * expiry time using the given latency. Returns null if no update is due. + * expiry time using the given latency, or returns null if no update is due. * <p> * Locking: subscription write. */ SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c, long maxLatency) throws DbException; - /** - * Returns the transmission count of the given message with respect to the - * given contact. - * <p> - * Locking: message read. - */ - int getTransmissionCount(T txn, ContactId c, MessageId m) - throws DbException; - /** * Returns a collection of transport acks for the given contact, or null if * no acks are due. @@ -501,8 +504,8 @@ interface Database<T> { /** * Returns a collection of transport updates for the given contact and - * updates their expiry times using the given latency. Returns null if no - * updates are due. + * updates their expiry times using the given latency, or returns null if + * no updates are due. * <p> * Locking: transport write. */ @@ -541,6 +544,24 @@ interface Database<T> { */ void incrementRetentionVersions(T txn) throws DbException; + /** + * Marks the given messages as not needing to be acknowledged to the + * given contact. + * <p> + * Locking: message write. + */ + void lowerAckFlag(T txn, ContactId c, Collection<MessageId> acked) + throws DbException; + + /** + * Marks the given messages as not having been requested by the given + * contact. + * <p> + * Locking: message write. + */ + void lowerRequestedFlag(T txn, ContactId c, Collection<MessageId> requested) + throws DbException; + /** * Merges the given configuration with the existing configuration for the * given transport. @@ -559,6 +580,27 @@ interface Database<T> { void mergeLocalProperties(T txn, TransportId t, TransportProperties p) throws DbException; + /** + * Marks a message as needing to be acknowledged to the given contact. + * <p> + * Locking: message write. + */ + void raiseAckFlag(T txn, ContactId c, MessageId m) throws DbException; + + /** + * Marks a message as having been requested by the given contact. + * <p> + * Locking: message write. + */ + void raiseRequestedFlag(T txn, ContactId c, MessageId m) throws DbException; + + /** + * Marks a message as having been seen by the given contact. + * <p> + * Locking: message write. + */ + void raiseSeenFlag(T txn, ContactId c, MessageId m) throws DbException; + /** * Removes a contact from the database. * <p> @@ -592,14 +634,23 @@ interface Database<T> { void removeMessage(T txn, MessageId m) throws DbException; /** - * Marks the given messages received from the given contact as having been - * acknowledged. + * Removes an offered message ID that was offered by the given contact, or + * returns false if there is no such message ID. * <p> * Locking: message write. */ - void removeMessagesToAck(T txn, ContactId c, Collection<MessageId> acked) + boolean removeOfferedMessage(T txn, ContactId c, MessageId m) throws DbException; + /** + * Removes the given offered message IDs that were offered by the given + * contact. + * <p> + * Locking: message write. + */ + void removeOfferedMessages(T txn, ContactId c, + Collection<MessageId> requested) throws DbException; + /** * Removes a transport (and all associated state) from the database. * <p> @@ -614,6 +665,14 @@ interface Database<T> { */ void removeVisibility(T txn, ContactId c, GroupId g) throws DbException; + /** + * Resets the transmission count and expiry time of the given message with + * respect to the given contact. + * <p> + * Locking: message write. + */ + void resetExpiryTime(T txn, ContactId c, MessageId m) throws DbException; + /** * Sets the connection reordering window for the given endpoint in the * given rotation period. @@ -649,12 +708,11 @@ interface Database<T> { void setLastConnected(T txn, ContactId c, long now) throws DbException; /** - * Marks a message read or unread and returns true if it was previously - * read. + * Marks a message as read or unread. * <p> * Locking: message write. */ - boolean setReadFlag(T txn, MessageId m, boolean read) throws DbException; + void setReadFlag(T txn, MessageId m, boolean read) throws DbException; /** * Sets the remote transport properties for the given contact, replacing @@ -686,16 +744,6 @@ interface Database<T> { boolean setRetentionTime(T txn, ContactId c, long retention, long version) throws DbException; - /** - * If the database contains the given message and it belongs to a group - * that is visible to the given contact, marks the message as seen by the - * contact and returns true; otherwise returns false. - * <p> - * Locking: message write, subscription read. - */ - boolean setStatusSeenIfVisible(T txn, ContactId c, MessageId m) - throws DbException; - /** * Records a retention ack from the given contact for the given version, * unless the contact has already acked an equal or higher version. @@ -731,12 +779,12 @@ interface Database<T> { void setVisibleToAll(T txn, GroupId g, boolean all) throws DbException; /** - * Updates the expiry times of the given messages with respect to the given - * contact, using the given transmission counts and the latency of the - * transport over which they were sent. + * Updates the transmission count and expiry time of the given message + * with respect to the given contact, using the latency of the transport + * over which it was sent. * <p> * Locking: message write. */ - void updateExpiryTimes(T txn, ContactId c, Map<MessageId, Integer> sent, - long maxLatency) throws DbException; + void updateExpiryTime(T txn, ContactId c, MessageId m, long maxLatency) + throws DbException; } diff --git a/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java b/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java index 039916dae237d3dc6380a5bd9a655bf329c1d0d0..66ad996d7ae13587dc9a92c372d363122e89583d 100644 --- a/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java +++ b/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java @@ -12,9 +12,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -32,7 +30,6 @@ import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportId; import net.sf.briar.api.TransportProperties; import net.sf.briar.api.clock.Clock; -import net.sf.briar.api.db.AckAndRequest; import net.sf.briar.api.db.ContactExistsException; import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DbException; @@ -53,7 +50,9 @@ import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent; import net.sf.briar.api.db.event.MessageAddedEvent; import net.sf.briar.api.db.event.MessageExpiredEvent; -import net.sf.briar.api.db.event.MessageReceivedEvent; +import net.sf.briar.api.db.event.MessageRequestedEvent; +import net.sf.briar.api.db.event.MessageToAckEvent; +import net.sf.briar.api.db.event.MessageToRequestEvent; import net.sf.briar.api.db.event.RemoteRetentionTimeUpdatedEvent; import net.sf.briar.api.db.event.RemoteSubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.RemoteTransportsUpdatedEvent; @@ -380,9 +379,8 @@ DatabaseCleaner.Callback { } /** - * Stores the given message, marks it as read if it was locally generated, - * otherwise marks it as seen by the sender, and marks it as unseen by all - * other contacts. + * Stores a message, initialises its status with respect to each contact, + * and marks it as read if it was locally generated. * <p> * Locking: contact read, message write, subscription read. * @param sender null for a locally generated message. @@ -390,11 +388,20 @@ DatabaseCleaner.Callback { private void addMessage(T txn, Message m, ContactId sender) throws DbException { db.addMessage(txn, m, sender != null); - MessageId id = m.getId(); - if(sender == null) db.setReadFlag(txn, id, true); - else db.addStatus(txn, sender, id, true); - for(ContactId c : db.getContactIds(txn)) - if(!c.equals(sender)) db.addStatus(txn, c, id, false); + if(sender == null) db.setReadFlag(txn, m.getId(), true); + Group g = m.getGroup(); + Collection<ContactId> visibility = db.getVisibility(txn, g.getId()); + visibility = new HashSet<ContactId>(visibility); + for(ContactId c : db.getContactIds(txn)) { + if(visibility.contains(c)) { + boolean offered = db.removeOfferedMessage(txn, c, m.getId()); + boolean seen = offered || c.equals(sender); + db.addStatus(txn, c, m.getId(), offered, seen); + } else { + if(c.equals(sender)) throw new IllegalStateException(); + db.addStatus(txn, c, m.getId(), false, false); + } + } // Count the bytes stored synchronized(spaceLock) { bytesStoredSinceLastCheck += m.getSerialised().length; @@ -462,61 +469,18 @@ DatabaseCleaner.Callback { return added; } - public boolean containsSendableMessages(ContactId c) throws DbException { - contactLock.readLock().lock(); - try { - messageLock.readLock().lock(); - try { - subscriptionLock.readLock().lock(); - try { - T txn = db.startTransaction(); - try { - if(!db.containsContact(txn, c)) - throw new NoSuchContactException(); - boolean has = db.containsSendableMessages(txn, c); - db.commitTransaction(txn); - return has; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - subscriptionLock.readLock().unlock(); - } - } finally { - messageLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - public Ack generateAck(ContactId c, int maxMessages) throws DbException { - Collection<MessageId> acked; + Collection<MessageId> ids; contactLock.readLock().lock(); try { - messageLock.readLock().lock(); + messageLock.writeLock().lock(); try { T txn = db.startTransaction(); try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - acked = db.getMessagesToAck(txn, c, maxMessages); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageLock.readLock().unlock(); - } - if(acked.isEmpty()) return null; - // Record the contents of the ack - messageLock.writeLock().lock(); - try { - T txn = db.startTransaction(); - try { - db.removeMessagesToAck(txn, c, acked); + ids = db.getMessagesToAck(txn, c, maxMessages); + if(!ids.isEmpty()) db.lowerAckFlag(txn, c, ids); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -528,18 +492,17 @@ DatabaseCleaner.Callback { } finally { contactLock.readLock().unlock(); } - return new Ack(acked); + if(ids.isEmpty()) return null; + return new Ack(ids); } public Collection<byte[]> generateBatch(ContactId c, int maxLength, long maxLatency) throws DbException { Collection<MessageId> ids; - Map<MessageId, Integer> sent = new HashMap<MessageId, Integer>(); List<byte[]> messages = new ArrayList<byte[]>(); - // Get some sendable messages from the database contactLock.readLock().lock(); try { - messageLock.readLock().lock(); + messageLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { @@ -547,11 +510,12 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - ids = db.getSendableMessages(txn, c, maxLength); + ids = db.getMessagesToSend(txn, c, maxLength); for(MessageId m : ids) { messages.add(db.getRawMessage(txn, m)); - sent.put(m, db.getTransmissionCount(txn, c, m)); + db.updateExpiryTime(txn, c, m, maxLatency); } + if(!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -560,39 +524,22 @@ DatabaseCleaner.Callback { } finally { subscriptionLock.readLock().unlock(); } - } finally { - messageLock.readLock().unlock(); - } - if(messages.isEmpty()) return null; - // Record the messages as sent - messageLock.writeLock().lock(); - try { - T txn = db.startTransaction(); - try { - db.updateExpiryTimes(txn, c, sent, maxLatency); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } } finally { messageLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } + if(messages.isEmpty()) return null; return Collections.unmodifiableList(messages); } - public Collection<byte[]> generateBatch(ContactId c, int maxLength, - long maxLatency, Collection<MessageId> requested) - throws DbException { - Map<MessageId, Integer> sent = new HashMap<MessageId, Integer>(); - List<byte[]> messages = new ArrayList<byte[]>(); - // Get some sendable messages from the database + public Offer generateOffer(ContactId c, int maxMessages, long maxLatency) + throws DbException { + Collection<MessageId> ids; contactLock.readLock().lock(); try { - messageLock.readLock().lock(); + messageLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { @@ -600,18 +547,9 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - Iterator<MessageId> it = requested.iterator(); - while(it.hasNext()) { - MessageId m = it.next(); - byte[] raw = db.getRawMessageIfSendable(txn, c, m); - if(raw != null) { - if(raw.length > maxLength) break; - messages.add(raw); - sent.put(m, db.getTransmissionCount(txn, c, m)); - maxLength -= raw.length; - } - it.remove(); - } + ids = db.getMessagesToOffer(txn, c, maxMessages); + for(MessageId m : ids) + db.updateExpiryTime(txn, c, m, maxLatency); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -621,15 +559,28 @@ DatabaseCleaner.Callback { subscriptionLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + messageLock.writeLock().unlock(); } - if(messages.isEmpty()) return null; - // Record the messages as sent + } finally { + contactLock.readLock().unlock(); + } + if(ids.isEmpty()) return null; + return new Offer(ids); + } + + public Request generateRequest(ContactId c, int maxMessages) + throws DbException { + Collection<MessageId> ids; + contactLock.readLock().lock(); + try { messageLock.writeLock().lock(); try { T txn = db.startTransaction(); try { - db.updateExpiryTimes(txn, c, sent, maxLatency); + if(!db.containsContact(txn, c)) + throw new NoSuchContactException(); + ids = db.getMessagesToRequest(txn, c, maxMessages); + if(!ids.isEmpty()) db.removeOfferedMessages(txn, c, ids); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -641,15 +592,17 @@ DatabaseCleaner.Callback { } finally { contactLock.readLock().unlock(); } - return Collections.unmodifiableList(messages); + if(ids.isEmpty()) return null; + return new Request(ids); } - public Offer generateOffer(ContactId c, int maxMessages) - throws DbException { - Collection<MessageId> offered; + public Collection<byte[]> generateRequestedBatch(ContactId c, int maxLength, + long maxLatency) throws DbException { + Collection<MessageId> ids; + List<byte[]> messages = new ArrayList<byte[]>(); contactLock.readLock().lock(); try { - messageLock.readLock().lock(); + messageLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { @@ -657,7 +610,12 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - offered = db.getMessagesToOffer(txn, c, maxMessages); + ids = db.getRequestedMessagesToSend(txn, c, maxLength); + for(MessageId m : ids) { + messages.add(db.getRawMessage(txn, m)); + db.updateExpiryTime(txn, c, m, maxLatency); + } + if(!ids.isEmpty()) db.lowerRequestedFlag(txn, c, ids); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -667,13 +625,13 @@ DatabaseCleaner.Callback { subscriptionLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + messageLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } - if(offered.isEmpty()) return null; - return new Offer(offered); + if(messages.isEmpty()) return null; + return Collections.unmodifiableList(messages); } public RetentionAck generateRetentionAck(ContactId c) throws DbException { @@ -1341,8 +1299,10 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - for(MessageId m : a.getMessageIds()) - db.setStatusSeenIfVisible(txn, c, m); + for(MessageId m : a.getMessageIds()) { + if(db.containsVisibleMessage(txn, c, m)) + db.raiseSeenFlag(txn, c, m); + } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -1372,7 +1332,7 @@ DatabaseCleaner.Callback { GroupId g = m.getGroup().getId(); visible = db.containsVisibleGroup(txn, c, g); if(!duplicate && visible) addMessage(txn, m, c); - if(visible) db.addMessageToAck(txn, c, m.getId()); + if(visible) db.raiseAckFlag(txn, c, m.getId()); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -1387,13 +1347,12 @@ DatabaseCleaner.Callback { } finally { contactLock.readLock().unlock(); } - if(visible) callListeners(new MessageReceivedEvent(c)); + if(visible) callListeners(new MessageToAckEvent(c)); if(!duplicate) callListeners(new MessageAddedEvent(m.getGroup(), c)); } - public AckAndRequest receiveOffer(ContactId c, Offer o) throws DbException { - List<MessageId> ack = new ArrayList<MessageId>(); - List<MessageId> request = new ArrayList<MessageId>(); + public void receiveOffer(ContactId c, Offer o) throws DbException { + boolean ack = false, request = false; contactLock.readLock().lock(); try { messageLock.writeLock().lock(); @@ -1405,9 +1364,14 @@ DatabaseCleaner.Callback { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); for(MessageId m : o.getMessageIds()) { - // If the message is present and visible, ack it - if(db.setStatusSeenIfVisible(txn, c, m)) ack.add(m); - else request.add(m); + if(db.containsVisibleMessage(txn, c, m)) { + db.raiseSeenFlag(txn, c, m); + db.raiseAckFlag(txn, c, m); + ack = true; + } else { + db.addOfferedMessage(txn, c, m); + request = true; + } } db.commitTransaction(txn); } catch(DbException e) { @@ -1423,9 +1387,39 @@ DatabaseCleaner.Callback { } finally { contactLock.readLock().unlock(); } - Ack a = ack.isEmpty() ? null : new Ack(ack); - Request r = request.isEmpty() ? null : new Request(request); - return new AckAndRequest(a, r); + if(ack) callListeners(new MessageToAckEvent(c)); + if(request) callListeners(new MessageToRequestEvent(c)); + } + + public void receiveRequest(ContactId c, Request r) throws DbException { + boolean requested = false; + contactLock.readLock().lock(); + try { + messageLock.writeLock().lock(); + try { + T txn = db.startTransaction(); + try { + if(!db.containsContact(txn, c)) + throw new NoSuchContactException(); + for(MessageId m : r.getMessageIds()) { + if(db.containsVisibleMessage(txn, c, m)) { + db.raiseRequestedFlag(txn, c, m); + db.resetExpiryTime(txn, c, m); + requested = true; + } + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + if(requested) callListeners(new MessageRequestedEvent(c)); } public void receiveRetentionAck(ContactId c, RetentionAck a) @@ -1605,6 +1599,8 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); + GroupId g = db.getInboxGroupId(txn, c); + if(g != null) db.removeGroup(txn, g); db.removeContact(txn, c); db.commitTransaction(txn); } catch(DbException e) { @@ -1681,6 +1677,10 @@ DatabaseCleaner.Callback { if(!db.containsLocalAuthor(txn, a)) throw new NoSuchLocalAuthorException(); affected = db.getContacts(txn, a); + for(ContactId c : affected) { + GroupId g = db.getInboxGroupId(txn, c); + if(g != null) db.removeGroup(txn, g); + } db.removeLocalAuthor(txn, a); db.commitTransaction(txn); } catch(DbException e) { @@ -1797,16 +1797,15 @@ DatabaseCleaner.Callback { } } - public boolean setReadFlag(MessageId m, boolean read) throws DbException { + public void setReadFlag(MessageId m, boolean read) throws DbException { messageLock.writeLock().lock(); try { T txn = db.startTransaction(); try { if(!db.containsMessage(txn, m)) throw new NoSuchMessageException(); - boolean wasRead = db.setReadFlag(txn, m, read); + db.setReadFlag(txn, m, read); db.commitTransaction(txn); - return wasRead; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -1840,36 +1839,6 @@ DatabaseCleaner.Callback { } } - public void setSeen(ContactId c, Collection<MessageId> seen) - throws DbException { - contactLock.readLock().lock(); - try { - messageLock.writeLock().lock(); - try { - subscriptionLock.readLock().lock(); - try { - T txn = db.startTransaction(); - try { - if(!db.containsContact(txn, c)) - throw new NoSuchContactException(); - for(MessageId m : seen) - db.setStatusSeenIfVisible(txn, c, m); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - subscriptionLock.readLock().unlock(); - } - } finally { - messageLock.writeLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - public void setVisibility(GroupId g, Collection<ContactId> visible) throws DbException { Collection<ContactId> affected = new ArrayList<ContactId>(); diff --git a/briar-core/src/net/sf/briar/db/H2Database.java b/briar-core/src/net/sf/briar/db/H2Database.java index a9de1ad248b45e3b2b6ea82d7744bb4fac41b999..c3dcc64ffe794104f504002bea73e737990bfbf6 100644 --- a/briar-core/src/net/sf/briar/db/H2Database.java +++ b/briar-core/src/net/sf/briar/db/H2Database.java @@ -34,6 +34,7 @@ class H2Database extends JdbcDatabase { this.config = config; this.fileUtils = fileUtils; String path = new File(config.getDatabaseDirectory(), "db").getPath(); + // FIXME: Remove WRITE_DELAY=0 after implementing BTPv2? url = "jdbc:h2:split:" + path + ";CIPHER=AES;MULTI_THREADED=1" + ";WRITE_DELAY=0;DB_CLOSE_ON_EXIT=false"; } @@ -105,6 +106,7 @@ class H2Database extends JdbcDatabase { } protected void flushBuffersToDisk(Statement s) throws SQLException { + // FIXME: Remove this after implementing BTPv2? s.execute("CHECKPOINT SYNC"); } } diff --git a/briar-core/src/net/sf/briar/db/JdbcDatabase.java b/briar-core/src/net/sf/briar/db/JdbcDatabase.java index 4ef02f9fd4d6cb93991a92ae0af707c95281bc15..a962f87ffbcc1981cd91d512a0b86befe28d0898 100644 --- a/briar-core/src/net/sf/briar/db/JdbcDatabase.java +++ b/briar-core/src/net/sf/briar/db/JdbcDatabase.java @@ -5,7 +5,7 @@ import static java.sql.Types.VARCHAR; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static net.sf.briar.api.messaging.MessagingConstants.MAX_SUBSCRIPTIONS; -import static net.sf.briar.api.messaging.MessagingConstants.RETENTION_MODULUS; +import static net.sf.briar.api.messaging.MessagingConstants.RETENTION_GRANULARITY; import static net.sf.briar.db.ExponentialBackoff.calculateExpiry; import java.io.IOException; @@ -157,6 +157,15 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String INDEX_MESSAGES_BY_TIMESTAMP = "CREATE INDEX messagesByTimestamp ON messages (timestamp)"; + private static final String CREATE_OFFERS = + "CREATE TABLE offers" + + " (messageId HASH NOT NULL," // Not a foreign key + + " contactId INT NOT NULL," + + " PRIMARY KEY (messageId, contactId)," + + " FOREIGN KEY (contactId)" + + " REFERENCES contacts (contactId)" + + " ON DELETE CASCADE)"; + // Locking: message private static final String CREATE_STATUSES = "CREATE TABLE statuses" @@ -164,6 +173,7 @@ abstract class JdbcDatabase implements Database<Connection> { + " contactId INT NOT NULL," + " ack BOOLEAN NOT NULL," + " seen BOOLEAN NOT NULL," + + " requested BOOLEAN NOT NULL," + " expiry BIGINT NOT NULL," + " txCount INT NOT NULL," + " PRIMARY KEY (messageId, contactId)," @@ -366,6 +376,7 @@ abstract class JdbcDatabase implements Database<Connection> { s.executeUpdate(insertTypeNames(CREATE_GROUP_VERSIONS)); s.executeUpdate(insertTypeNames(CREATE_MESSAGES)); s.executeUpdate(INDEX_MESSAGES_BY_TIMESTAMP); + s.executeUpdate(insertTypeNames(CREATE_OFFERS)); s.executeUpdate(insertTypeNames(CREATE_STATUSES)); s.executeUpdate(INDEX_STATUSES_BY_MESSAGE); s.executeUpdate(INDEX_STATUSES_BY_CONTACT); @@ -522,11 +533,37 @@ abstract class JdbcDatabase implements Database<Connection> { if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); + // Create a status row for each message + sql = "SELECT messageID FROM messages"; + ps = txn.prepareStatement(sql); + rs = ps.executeQuery(); + Collection<byte[]> ids = new ArrayList<byte[]>(); + while(rs.next()) ids.add(rs.getBytes(1)); + rs.close(); + ps.close(); + if(!ids.isEmpty()) { + sql = "INSERT INTO statuses (messageId, contactId, ack," + + " seen, requested, expiry, txCount)" + + " VALUES (?, ?, FALSE, FALSE, FALSE, 0, 0)"; + ps = txn.prepareStatement(sql); + ps.setInt(2, c.getInt()); + for(byte[] id : ids) { + ps.setBytes(1, id); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if(batchAffected.length != ids.size()) + throw new DbStateException(); + for(int i = 0; i < batchAffected.length; i++) { + if(batchAffected[i] != 1) throw new DbStateException(); + } + ps.close(); + } // Make groups that are visible to everyone visible to this contact sql = "SELECT groupId FROM groups WHERE visibleToAll = TRUE"; ps = txn.prepareStatement(sql); rs = ps.executeQuery(); - Collection<byte[]> ids = new ArrayList<byte[]>(); + ids = new ArrayList<byte[]>(); while(rs.next()) ids.add(rs.getBytes(1)); rs.close(); ps.close(); @@ -656,7 +693,7 @@ abstract class JdbcDatabase implements Database<Connection> { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT COUNT (groupId) FROM groups"; + String sql = "SELECT COUNT (NULL) FROM groups"; ps = txn.prepareStatement(sql); rs = ps.executeQuery(); if(!rs.next()) throw new DbStateException(); @@ -735,18 +772,29 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void addMessageToAck(Connection txn, ContactId c, MessageId m) + // FIXME: Limit the number of offers per contact + public void addOfferedMessage(Connection txn, ContactId c, MessageId m) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "UPDATE statuses SET ack = TRUE" + String sql = "SELECT NULL FROM offers" + " WHERE messageId = ? AND contactId = ?"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); ps.setInt(2, c.getInt()); + rs = ps.executeQuery(); + boolean found = rs.next(); + if(rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + if(found) return; + sql = "INSERT INTO offers (messageId, contactId) VALUES (?, ?)"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected != 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { tryToClose(rs); @@ -801,17 +849,18 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void addStatus(Connection txn, ContactId c, MessageId m, + public void addStatus(Connection txn, ContactId c, MessageId m, boolean ack, boolean seen) throws DbException { PreparedStatement ps = null; try { - String sql = "INSERT INTO statuses" - + " (messageId, contactId, ack, seen, expiry, txCount)" - + " VALUES (?, ?, FALSE, ?, 0, 0)"; + String sql = "INSERT INTO statuses (messageId, contactId, ack," + + " seen, requested, expiry, txCount)" + + " VALUES (?, ?, ?, ?, FALSE, 0, 0)"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); ps.setInt(2, c.getInt()); - ps.setBoolean(3, seen); + ps.setBoolean(3, ack); + ps.setBoolean(4, seen); int affected = ps.executeUpdate(); if(affected != 1) throw new DbStateException(); ps.close(); @@ -1011,29 +1060,14 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public boolean containsSendableMessages(Connection txn, ContactId c) + public boolean containsTransport(Connection txn, TransportId t) throws DbException { - long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT NULL FROM messages AS m" - + " JOIN contactGroups AS cg" - + " ON m.groupId = cg.groupId" - + " JOIN groupVisibilities AS gv" - + " ON m.groupId = gv.groupId" - + " AND cg.contactId = gv.contactId" - + " JOIN retentionVersions AS rv" - + " ON cg.contactId = rv.contactId" - + " JOIN statuses AS s" - + " ON m.messageId = s.messageId" - + " AND cg.contactId = s.contactId" - + " WHERE cg.contactId = ?" - + " AND timestamp >= retention" - + " AND seen = FALSE AND s.expiry < ?"; + String sql = "SELECT NULL FROM transports WHERE transportId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setLong(2, now); + ps.setBytes(1, t.getBytes()); rs = ps.executeQuery(); boolean found = rs.next(); if(rs.next()) throw new DbStateException(); @@ -1047,14 +1081,16 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public boolean containsTransport(Connection txn, TransportId t) - throws DbException { + public boolean containsVisibleGroup(Connection txn, ContactId c, + GroupId g) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT NULL FROM transports WHERE transportId = ?"; + String sql = "SELECT NULL FROM groupVisibilities" + + " WHERE contactId = ? AND groupId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, t.getBytes()); + ps.setInt(1, c.getInt()); + ps.setBytes(2, g.getBytes()); rs = ps.executeQuery(); boolean found = rs.next(); if(rs.next()) throw new DbStateException(); @@ -1068,16 +1104,19 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public boolean containsVisibleGroup(Connection txn, ContactId c, - GroupId g) throws DbException { + public boolean containsVisibleMessage(Connection txn, ContactId c, + MessageId m) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT NULL FROM groupVisibilities" - + " WHERE contactId = ? AND groupId = ?"; + String sql = "SELECT NULL FROM messages AS m" + + " JOIN groupVisibilities AS gv" + + " ON m.groupId = gv.groupId" + + " WHERE messageId = ?" + + " AND contactId = ?"; ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setBytes(2, g.getBytes()); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); rs = ps.executeQuery(); boolean found = rs.next(); if(rs.next()) throw new DbStateException(); @@ -1662,7 +1701,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " AND cg.contactId = s.contactId" + " WHERE cg.contactId = ?" + " AND timestamp >= retention" - + " AND seen = FALSE AND s.expiry < ?" + + " AND seen = FALSE AND requested = FALSE" + + " AND s.expiry < ?" + " ORDER BY timestamp DESC LIMIT ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); @@ -1681,6 +1721,74 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public Collection<MessageId> getMessagesToRequest(Connection txn, + ContactId c, int maxMessages) throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT messageId FROM offers" + + " WHERE contactId = ?" + + " LIMIT ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, maxMessages); + rs = ps.executeQuery(); + List<MessageId> ids = new ArrayList<MessageId>(); + while(rs.next()) ids.add(new MessageId(rs.getBytes(1))); + rs.close(); + ps.close(); + return Collections.unmodifiableList(ids); + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + + public Collection<MessageId> getMessagesToSend(Connection txn, ContactId c, + int maxLength) throws DbException { + long now = clock.currentTimeMillis(); + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length, m.messageId FROM messages AS m" + + " JOIN contactGroups AS cg" + + " ON m.groupId = cg.groupId" + + " JOIN groupVisibilities AS gv" + + " ON m.groupId = gv.groupId" + + " AND cg.contactId = gv.contactId" + + " JOIN retentionVersions AS rv" + + " ON cg.contactId = rv.contactId" + + " JOIN statuses AS s" + + " ON m.messageId = s.messageId" + + " AND cg.contactId = s.contactId" + + " WHERE cg.contactId = ?" + + " AND timestamp >= retention" + + " AND seen = FALSE" + + " AND s.expiry < ?" + + " ORDER BY timestamp DESC"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setLong(2, now); + rs = ps.executeQuery(); + List<MessageId> ids = new ArrayList<MessageId>(); + int total = 0; + while(rs.next()) { + int length = rs.getInt(1); + if(total + length > maxLength) break; + ids.add(new MessageId(rs.getBytes(2))); + total += length; + } + rs.close(); + ps.close(); + return Collections.unmodifiableList(ids); + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + public Collection<MessageId> getOldMessages(Connection txn, int capacity) throws DbException { PreparedStatement ps = null; @@ -1759,49 +1867,6 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public byte[] getRawMessageIfSendable(Connection txn, ContactId c, - MessageId m) throws DbException { - long now = clock.currentTimeMillis(); - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT length, raw FROM messages AS m" - + " JOIN contactGroups AS cg" - + " ON m.groupId = cg.groupId" - + " JOIN groupVisibilities AS gv" - + " ON m.groupId = gv.groupId" - + " AND cg.contactId = gv.contactId" - + " JOIN retentionVersions AS rv" - + " ON cg.contactId = rv.contactId" - + " JOIN statuses AS s" - + " ON m.messageId = s.messageId" - + " AND cg.contactId = s.contactId" - + " WHERE m.messageId = ?" - + " AND cg.contactId = ?" - + " AND timestamp >= retention" - + " AND seen = FALSE AND s.expiry < ?"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); - ps.setInt(2, c.getInt()); - ps.setLong(3, now); - rs = ps.executeQuery(); - byte[] raw = null; - if(rs.next()) { - int length = rs.getInt(1); - raw = rs.getBlob(2).getBytes(1, length); - if(raw.length != length) throw new DbStateException(); - } - if(rs.next()) throw new DbStateException(); - rs.close(); - ps.close(); - return raw; - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - throw new DbException(e); - } - } - public boolean getReadFlag(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -1810,8 +1875,8 @@ abstract class JdbcDatabase implements Database<Connection> { ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); rs = ps.executeQuery(); - boolean read = false; - if(rs.next()) read = rs.getBoolean(1); + if(!rs.next()) throw new DbStateException(); + boolean read = rs.getBoolean(1); if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); @@ -1859,6 +1924,50 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public Collection<MessageId> getRequestedMessagesToSend(Connection txn, + ContactId c, int maxLength) throws DbException { + long now = clock.currentTimeMillis(); + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT length, m.messageId FROM messages AS m" + + " JOIN contactGroups AS cg" + + " ON m.groupId = cg.groupId" + + " JOIN groupVisibilities AS gv" + + " ON m.groupId = gv.groupId" + + " AND cg.contactId = gv.contactId" + + " JOIN retentionVersions AS rv" + + " ON cg.contactId = rv.contactId" + + " JOIN statuses AS s" + + " ON m.messageId = s.messageId" + + " AND cg.contactId = s.contactId" + + " WHERE cg.contactId = ?" + + " AND timestamp >= retention" + + " AND seen = FALSE AND requested = TRUE" + + " AND s.expiry < ?" + + " ORDER BY timestamp DESC"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setLong(2, now); + rs = ps.executeQuery(); + List<MessageId> ids = new ArrayList<MessageId>(); + int total = 0; + while(rs.next()) { + int length = rs.getInt(1); + if(total + length > maxLength) break; + ids.add(new MessageId(rs.getBytes(2))); + total += length; + } + rs.close(); + ps.close(); + return Collections.unmodifiableList(ids); + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + public RetentionAck getRetentionAck(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; @@ -1899,13 +2008,11 @@ abstract class JdbcDatabase implements Database<Connection> { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT timestamp, localVersion, txCount" - + " FROM messages AS m" - + " JOIN retentionVersions AS rv" - + " WHERE rv.contactId = ?" + String sql = "SELECT localVersion, txCount" + + " FROM retentionVersions" + + " WHERE contactId = ?" + " AND localVersion > localAcked" - + " AND expiry < ?" - + " ORDER BY timestamp LIMIT 1"; + + " AND expiry < ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setLong(2, now); @@ -1915,10 +2022,20 @@ abstract class JdbcDatabase implements Database<Connection> { ps.close(); return null; } - long retention = rs.getLong(1); - retention -= retention % RETENTION_MODULUS; - long version = rs.getLong(2); - int txCount = rs.getInt(3); + long version = rs.getLong(1); + int txCount = rs.getInt(2); + if(rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + sql = "SELECT timestamp FROM messages AS m" + + " ORDER BY timestamp LIMIT 1"; + ps = txn.prepareStatement(sql); + rs = ps.executeQuery(); + long retention = 0; + if(rs.next()) { + retention = rs.getLong(1); + retention -= retention % RETENTION_GRANULARITY; + } if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); @@ -1976,56 +2093,13 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Collection<MessageId> getSendableMessages(Connection txn, - ContactId c, int maxLength) throws DbException { - long now = clock.currentTimeMillis(); + public SubscriptionAck getSubscriptionAck(Connection txn, ContactId c) + throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - String sql = "SELECT length, m.messageId FROM messages AS m" - + " JOIN contactGroups AS cg" - + " ON m.groupId = cg.groupId" - + " JOIN groupVisibilities AS gv" - + " ON m.groupId = gv.groupId" - + " AND cg.contactId = gv.contactId" - + " JOIN retentionVersions AS rv" - + " ON cg.contactId = rv.contactId" - + " JOIN statuses AS s" - + " ON m.messageId = s.messageId" - + " AND cg.contactId = s.contactId" - + " WHERE cg.contactId = ?" - + " AND timestamp >= retention" - + " AND seen = FALSE AND s.expiry < ?" - + " ORDER BY timestamp DESC"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setLong(2, now); - rs = ps.executeQuery(); - List<MessageId> ids = new ArrayList<MessageId>(); - int total = 0; - while(rs.next()) { - int length = rs.getInt(1); - if(total + length > maxLength) break; - ids.add(new MessageId(rs.getBytes(2))); - total += length; - } - rs.close(); - ps.close(); - return Collections.unmodifiableList(ids); - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - throw new DbException(e); - } - } - - public SubscriptionAck getSubscriptionAck(Connection txn, ContactId c) - throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT remoteVersion FROM groupVersions" - + " WHERE contactId = ? AND remoteAcked = FALSE"; + String sql = "SELECT remoteVersion FROM groupVersions" + + " WHERE contactId = ? AND remoteAcked = FALSE"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); rs = ps.executeQuery(); @@ -2104,30 +2178,6 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public int getTransmissionCount(Connection txn, ContactId c, MessageId m) - throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT txCount FROM statuses" - + " WHERE messageId = ? AND contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); - ps.setInt(2, c.getInt()); - rs = ps.executeQuery(); - if(!rs.next()) throw new DbStateException(); - int txCount = rs.getInt(1); - if(rs.next()) throw new DbStateException(); - rs.close(); - ps.close(); - return txCount; - } catch(SQLException e) { - tryToClose(ps); - tryToClose(rs); - throw new DbException(e); - } - } - public Collection<TransportAck> getTransportAcks(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; @@ -2159,7 +2209,7 @@ abstract class JdbcDatabase implements Database<Connection> { if(batchAffected.length != acks.size()) throw new DbStateException(); for(int i = 0; i < batchAffected.length; i++) { - if(batchAffected[i] < 1) throw new DbStateException(); + if(batchAffected[i] != 1) throw new DbStateException(); } ps.close(); return Collections.unmodifiableList(acks); @@ -2360,6 +2410,58 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public void lowerAckFlag(Connection txn, ContactId c, + Collection<MessageId> acked) throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE statuses SET ack = FALSE" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(2, c.getInt()); + for(MessageId m : acked) { + ps.setBytes(1, m.getBytes()); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if(batchAffected.length != acked.size()) + throw new DbStateException(); + for(int i = 0; i < batchAffected.length; i++) { + if(batchAffected[i] < 0) throw new DbStateException(); + if(batchAffected[i] > 1) throw new DbStateException(); + } + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + + public void lowerRequestedFlag(Connection txn, ContactId c, + Collection<MessageId> requested) throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE statuses SET requested = FALSE" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(2, c.getInt()); + for(MessageId m : requested) { + ps.setBytes(1, m.getBytes()); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if(batchAffected.length != requested.size()) + throw new DbStateException(); + for(int i = 0; i < batchAffected.length; i++) { + if(batchAffected[i] < 0) throw new DbStateException(); + if(batchAffected[i] > 1) throw new DbStateException(); + } + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + public void mergeConfig(Connection txn, TransportId t, TransportConfig c) throws DbException { // Merge the new configuration with the existing one @@ -2403,6 +2505,7 @@ abstract class JdbcDatabase implements Database<Connection> { int[] batchAffected = ps.executeBatch(); if(batchAffected.length != m.size()) throw new DbStateException(); for(int i = 0; i < batchAffected.length; i++) { + if(batchAffected[i] < 0) throw new DbStateException(); if(batchAffected[i] > 1) throw new DbStateException(); } // Insert any properties that don't already exist @@ -2432,6 +2535,64 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public void raiseAckFlag(Connection txn, ContactId c, MessageId m) + throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE statuses SET ack = TRUE" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + + public void raiseRequestedFlag(Connection txn, ContactId c, MessageId m) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "UPDATE statuses SET requested = TRUE" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + + public void raiseSeenFlag(Connection txn, ContactId c, MessageId m) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "UPDATE statuses SET seen = TRUE" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + throw new DbException(e); + } + } + public void removeContact(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; @@ -2524,20 +2685,39 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void removeMessagesToAck(Connection txn, ContactId c, - Collection<MessageId> acked) throws DbException { + public boolean removeOfferedMessage(Connection txn, ContactId c, + MessageId m) throws DbException { PreparedStatement ps = null; try { - String sql = "UPDATE statuses SET ack = FALSE" + String sql = "DELETE FROM offers" + " WHERE contactId = ? AND messageId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - for(MessageId m : acked) { + ps.setBytes(2, m.getBytes()); + int affected = ps.executeUpdate(); + if(affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + return affected == 1; + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + + public void removeOfferedMessages(Connection txn, ContactId c, + Collection<MessageId> requested) throws DbException { + PreparedStatement ps = null; + try { + String sql = "DELETE FROM offers" + + " WHERE contactId = ? AND messageId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + for(MessageId m : requested) { ps.setBytes(2, m.getBytes()); ps.addBatch(); } int[] batchAffected = ps.executeBatch(); - if(batchAffected.length != acked.size()) + if(batchAffected.length != requested.size()) throw new DbStateException(); for(int i = 0; i < batchAffected.length; i++) { if(batchAffected[i] != 1) throw new DbStateException(); @@ -2592,6 +2772,24 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public void resetExpiryTime(Connection txn, ContactId c, MessageId m) + throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE statuses" + + " SET expiry = 0, txCount = 0" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected < 0 || affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } public void setConnectionWindow(Connection txn, ContactId c, TransportId t, long period, long centre, byte[] bitmap) throws DbException { PreparedStatement ps = null; @@ -2605,7 +2803,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setBytes(4, t.getBytes()); ps.setLong(5, period); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { tryToClose(ps); @@ -2627,7 +2825,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(2, c.getInt()); ps.setLong(3, version); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); // Return false if the update is obsolete if(affected == 0) return false; @@ -2664,24 +2862,6 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setLastConnected(Connection txn, ContactId c, long now) - throws DbException { - PreparedStatement ps = null; - try { - String sql = "UPDATE connectionTimes SET lastConnected = ?" - + " WHERE contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setLong(1, now); - ps.setInt(2, c.getInt()); - int affected = ps.executeUpdate(); - if(affected < 1) throw new DbStateException(); - ps.close(); - } catch(SQLException e) { - tryToClose(ps); - throw new DbException(e); - } - } - public void setInboxGroup(Connection txn, ContactId c, Group g) throws DbException { PreparedStatement ps = null; @@ -2694,7 +2874,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(1, c.getInt()); ps.executeUpdate(); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); // Make the group visible to the contact and set it as the inbox sql = "INSERT INTO groupVisibilities" @@ -2724,31 +2904,36 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public boolean setReadFlag(Connection txn, MessageId m, boolean read) + public void setLastConnected(Connection txn, ContactId c, long now) throws DbException { PreparedStatement ps = null; - ResultSet rs = null; try { - String sql = "SELECT read FROM messages WHERE messageId = ?"; + String sql = "UPDATE connectionTimes SET lastConnected = ?" + + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); - rs = ps.executeQuery(); - if(!rs.next()) throw new DbStateException(); - boolean wasRead = rs.getBoolean(1); - if(rs.next()) throw new DbStateException(); - rs.close(); + ps.setLong(1, now); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); - if(wasRead == read) return read; - sql = "UPDATE messages SET read = ? WHERE messageId = ?"; + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + + public void setReadFlag(Connection txn, MessageId m, boolean read) + throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE messages SET read = ? WHERE messageId = ?"; ps = txn.prepareStatement(sql); ps.setBoolean(1, read); ps.setBytes(2, m.getBytes()); int affected = ps.executeUpdate(); - if(affected != 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); - return !read; } catch(SQLException e) { - tryToClose(rs); tryToClose(ps); throw new DbException(e); } @@ -2806,12 +2991,12 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(1, c.getInt()); ps.setBytes(2, t.getBytes()); rs = ps.executeQuery(); - boolean exists = rs.next(); + boolean found = rs.next(); if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); // Mark the update as needing to be acked - if(exists) { + if(found) { // The row exists - update it sql = "UPDATE contactTransportVersions" + " SET remoteVersion = ?, remoteAcked = FALSE" @@ -2823,7 +3008,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setBytes(3, t.getBytes()); ps.setLong(4, version); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); // Return false if the update is obsolete if(affected == 0) return false; @@ -2888,7 +3073,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(3, c.getInt()); ps.setLong(4, version); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); return affected == 1; } catch(SQLException e) { @@ -2910,7 +3095,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setLong(3, version); ps.setLong(4, version); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { tryToClose(ps); @@ -2918,45 +3103,6 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public boolean setStatusSeenIfVisible(Connection txn, ContactId c, - MessageId m) throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT NULL FROM messages AS m" - + " JOIN contactGroups AS cg" - + " ON m.groupId = cg.groupId" - + " JOIN groupVisibilities AS gv" - + " ON m.groupId = gv.groupId" - + " AND cg.contactId = gv.contactId" - + " WHERE messageId = ?" - + " AND cg.contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); - ps.setInt(2, c.getInt()); - rs = ps.executeQuery(); - boolean found = rs.next(); - if(rs.next()) throw new DbStateException(); - rs.close(); - ps.close(); - if(!found) return false; - sql = "UPDATE statuses SET seen = ?" - + " WHERE messageId = ? AND contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setBoolean(1, true); - ps.setBytes(2, m.getBytes()); - ps.setInt(3, c.getInt()); - int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); - ps.close(); - return true; - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - throw new DbException(e); - } - } - public void setSubscriptionUpdateAcked(Connection txn, ContactId c, long version) throws DbException { PreparedStatement ps = null; @@ -2970,7 +3116,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setLong(3, version); ps.setLong(4, version); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { tryToClose(ps); @@ -2992,7 +3138,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setLong(4, version); ps.setLong(5, version); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { tryToClose(ps); @@ -3009,7 +3155,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setBoolean(1, all); ps.setBytes(2, g.getBytes()); int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + if(affected < 0 || affected > 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { tryToClose(ps); @@ -3017,29 +3163,34 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void updateExpiryTimes(Connection txn, ContactId c, - Map<MessageId, Integer> sent, long maxLatency) throws DbException { - long now = clock.currentTimeMillis(); + public void updateExpiryTime(Connection txn, ContactId c, MessageId m, + long maxLatency) throws DbException { PreparedStatement ps = null; + ResultSet rs = null; try { - String sql = "UPDATE statuses" - + " SET expiry = ?, txCount = txCount + 1" + String sql = "SELECT txCount FROM statuses" + + " WHERE messageId = ? AND contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setBytes(1, m.getBytes()); + ps.setInt(2, c.getInt()); + rs = ps.executeQuery(); + if(!rs.next()) throw new DbStateException(); + int txCount = rs.getInt(1); + if(rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + sql = "UPDATE statuses SET expiry = ?, txCount = txCount + 1" + " WHERE messageId = ? AND contactId = ?"; ps = txn.prepareStatement(sql); + long now = clock.currentTimeMillis(); + ps.setLong(1, calculateExpiry(now, maxLatency, txCount)); + ps.setBytes(2, m.getBytes()); ps.setInt(3, c.getInt()); - for(Entry<MessageId, Integer> e : sent.entrySet()) { - ps.setLong(1, calculateExpiry(now, maxLatency, e.getValue())); - ps.setBytes(2, e.getKey().getBytes()); - ps.addBatch(); - } - int[] batchAffected = ps.executeBatch(); - if(batchAffected.length != sent.size()) - throw new DbStateException(); - for(int i = 0; i < batchAffected.length; i++) { - if(batchAffected[i] > 1) throw new DbStateException(); - } + int affected = ps.executeUpdate(); + if(affected != 1) throw new DbStateException(); ps.close(); } catch(SQLException e) { + tryToClose(rs); tryToClose(ps); throw new DbException(e); } diff --git a/briar-core/src/net/sf/briar/messaging/PacketWriterImpl.java b/briar-core/src/net/sf/briar/messaging/PacketWriterImpl.java index 281d5a36b927fdf6daeaf976bea75b6c42fe3b0b..ebf93ea3658c408ae5f765dc9f15d8112be921f7 100644 --- a/briar-core/src/net/sf/briar/messaging/PacketWriterImpl.java +++ b/briar-core/src/net/sf/briar/messaging/PacketWriterImpl.java @@ -47,7 +47,7 @@ class PacketWriterImpl implements PacketWriter { w = writerFactory.createWriter(out); } - public int getMaxMessagesForAck(long capacity) { + public int getMaxMessagesForRequest(long capacity) { int packet = (int) Math.min(capacity, MAX_PACKET_LENGTH); int overhead = serial.getSerialisedStructStartLength(ACK) + serial.getSerialisedListStartLength() diff --git a/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java b/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java index 8d7058235879967d7de8eb46d3f828e1c9fac0d3..be616e9652308075efdec080efef2a23e4b09a8b 100644 --- a/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java +++ b/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java @@ -8,7 +8,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.security.GeneralSecurityException; -import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; @@ -19,7 +18,6 @@ import java.util.logging.Logger; import net.sf.briar.api.ContactId; import net.sf.briar.api.FormatException; import net.sf.briar.api.TransportId; -import net.sf.briar.api.db.AckAndRequest; import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.event.ContactRemovedEvent; @@ -29,13 +27,14 @@ import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent; import net.sf.briar.api.db.event.MessageAddedEvent; import net.sf.briar.api.db.event.MessageExpiredEvent; -import net.sf.briar.api.db.event.MessageReceivedEvent; +import net.sf.briar.api.db.event.MessageRequestedEvent; +import net.sf.briar.api.db.event.MessageToAckEvent; +import net.sf.briar.api.db.event.MessageToRequestEvent; import net.sf.briar.api.db.event.RemoteRetentionTimeUpdatedEvent; import net.sf.briar.api.db.event.RemoteSubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.RemoteTransportsUpdatedEvent; import net.sf.briar.api.messaging.Ack; import net.sf.briar.api.messaging.Message; -import net.sf.briar.api.messaging.MessageId; import net.sf.briar.api.messaging.MessageVerifier; import net.sf.briar.api.messaging.Offer; import net.sf.briar.api.messaging.PacketReader; @@ -86,7 +85,7 @@ abstract class DuplexConnection implements DatabaseListener { private final Executor dbExecutor, cryptoExecutor; private final MessageVerifier messageVerifier; private final long maxLatency; - private final AtomicBoolean canSendOffer, disposed; + private final AtomicBoolean disposed; private final BlockingQueue<Runnable> writerTasks; private volatile PacketWriter writer = null; @@ -113,7 +112,6 @@ abstract class DuplexConnection implements DatabaseListener { contactId = ctx.getContactId(); transportId = ctx.getTransportId(); maxLatency = transport.getMaxLatency(); - canSendOffer = new AtomicBoolean(true); disposed = new AtomicBoolean(false); writerTasks = new LinkedBlockingQueue<Runnable>(); } @@ -129,8 +127,7 @@ abstract class DuplexConnection implements DatabaseListener { ContactRemovedEvent c = (ContactRemovedEvent) e; if(contactId.equals(c.getContactId())) writerTasks.add(CLOSE); } else if(e instanceof MessageAddedEvent) { - if(canSendOffer.getAndSet(false)) - dbExecutor.execute(new GenerateOffer()); + dbExecutor.execute(new GenerateOffer()); } else if(e instanceof MessageExpiredEvent) { dbExecutor.execute(new GenerateRetentionUpdate()); } else if(e instanceof LocalSubscriptionsUpdatedEvent) { @@ -138,20 +135,24 @@ abstract class DuplexConnection implements DatabaseListener { (LocalSubscriptionsUpdatedEvent) e; if(l.getAffectedContacts().contains(contactId)) { dbExecutor.execute(new GenerateSubscriptionUpdate()); - if(canSendOffer.getAndSet(false)) - dbExecutor.execute(new GenerateOffer()); + dbExecutor.execute(new GenerateOffer()); } } else if(e instanceof LocalTransportsUpdatedEvent) { dbExecutor.execute(new GenerateTransportUpdates()); - } else if(e instanceof MessageReceivedEvent) { - if(((MessageReceivedEvent) e).getContactId().equals(contactId)) - dbExecutor.execute(new GenerateAcks()); + } else if(e instanceof MessageRequestedEvent) { + if(((MessageRequestedEvent) e).getContactId().equals(contactId)) + dbExecutor.execute(new GenerateBatch()); + } else if(e instanceof MessageToAckEvent) { + if(((MessageToAckEvent) e).getContactId().equals(contactId)) + dbExecutor.execute(new GenerateAck()); + } else if(e instanceof MessageToRequestEvent) { + if(((MessageToRequestEvent) e).getContactId().equals(contactId)) + dbExecutor.execute(new GenerateRequest()); } else if(e instanceof RemoteRetentionTimeUpdatedEvent) { dbExecutor.execute(new GenerateRetentionAck()); } else if(e instanceof RemoteSubscriptionsUpdatedEvent) { dbExecutor.execute(new GenerateSubscriptionAck()); - if(canSendOffer.getAndSet(false)) - dbExecutor.execute(new GenerateOffer()); + dbExecutor.execute(new GenerateOffer()); } else if(e instanceof RemoteTransportsUpdatedEvent) { dbExecutor.execute(new GenerateTransportAcks()); } @@ -178,10 +179,7 @@ abstract class DuplexConnection implements DatabaseListener { } else if(reader.hasRequest()) { Request r = reader.readRequest(); if(LOG.isLoggable(INFO)) LOG.info("Received request"); - // Make a mutable copy of the requested IDs - Collection<MessageId> requested = r.getMessageIds(); - requested = new ArrayList<MessageId>(requested); - dbExecutor.execute(new GenerateBatches(requested)); + dbExecutor.execute(new ReceiveRequest(r)); } else if(reader.hasRetentionAck()) { RetentionAck a = reader.readRetentionAck(); if(LOG.isLoggable(INFO)) LOG.info("Received retention ack"); @@ -231,16 +229,17 @@ abstract class DuplexConnection implements DatabaseListener { writer = packetWriterFactory.createPacketWriter(out, transport.shouldFlush()); if(LOG.isLoggable(INFO)) LOG.info("Starting to write"); - // Send the initial packets: updates, acks, offer + // Send the initial packets dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportUpdates()); dbExecutor.execute(new GenerateSubscriptionAck()); dbExecutor.execute(new GenerateSubscriptionUpdate()); dbExecutor.execute(new GenerateRetentionAck()); dbExecutor.execute(new GenerateRetentionUpdate()); - dbExecutor.execute(new GenerateAcks()); - if(canSendOffer.getAndSet(false)) - dbExecutor.execute(new GenerateOffer()); + dbExecutor.execute(new GenerateAck()); + dbExecutor.execute(new GenerateBatch()); + dbExecutor.execute(new GenerateOffer()); + dbExecutor.execute(new GenerateRequest()); // Main loop Runnable task = null; while(true) { @@ -301,7 +300,7 @@ abstract class DuplexConnection implements DatabaseListener { } } - // This task runs on a verification thread + // This task runs on a crypto thread private class VerifyMessage implements Runnable { private final UnverifiedMessage message; @@ -351,38 +350,29 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { - AckAndRequest ar = db.receiveOffer(contactId, offer); - Ack a = ar.getAck(); - Request r = ar.getRequest(); - if(LOG.isLoggable(INFO)) { - LOG.info("DB received offer: " + (a != null) - + " " + (r != null)); - } - if(a != null) writerTasks.add(new WriteAck(a)); - if(r != null) writerTasks.add(new WriteRequest(r)); + db.receiveOffer(contactId, offer); + if(LOG.isLoggable(INFO)) LOG.info("DB received offer"); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } } } - // This task runs on the writer thread - private class WriteRequest implements Runnable { + // This task runs on a database thread + private class ReceiveRequest implements Runnable { private final Request request; - private WriteRequest(Request request) { + private ReceiveRequest(Request request) { this.request = request; } public void run() { - assert writer != null; try { - writer.writeRequest(request); - if(LOG.isLoggable(INFO)) LOG.info("Sent request"); - } catch(IOException e) { + db.receiveRequest(contactId, request); + if(LOG.isLoggable(INFO)) LOG.info("DB received request"); + } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); - dispose(true, true); } } } @@ -506,11 +496,11 @@ abstract class DuplexConnection implements DatabaseListener { } // This task runs on a database thread - private class GenerateAcks implements Runnable { + private class GenerateAck implements Runnable { public void run() { assert writer != null; - int maxMessages = writer.getMaxMessagesForAck(Long.MAX_VALUE); + int maxMessages = writer.getMaxMessagesForRequest(Long.MAX_VALUE); try { Ack a = db.generateAck(contactId, maxMessages); if(LOG.isLoggable(INFO)) @@ -536,7 +526,7 @@ abstract class DuplexConnection implements DatabaseListener { try { writer.writeAck(ack); if(LOG.isLoggable(INFO)) LOG.info("Sent ack"); - dbExecutor.execute(new GenerateAcks()); + dbExecutor.execute(new GenerateAck()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -544,24 +534,17 @@ abstract class DuplexConnection implements DatabaseListener { } } - // This task runs on a database thred - private class GenerateBatches implements Runnable { - - private final Collection<MessageId> requested; - - private GenerateBatches(Collection<MessageId> requested) { - this.requested = requested; - } + // This task runs on a database thread + private class GenerateBatch implements Runnable { public void run() { assert writer != null; try { - Collection<byte[]> batch = db.generateBatch(contactId, - MAX_PACKET_LENGTH, maxLatency, requested); + Collection<byte[]> b = db.generateRequestedBatch(contactId, + MAX_PACKET_LENGTH, maxLatency); if(LOG.isLoggable(INFO)) - LOG.info("Generated batch: " + (batch != null)); - if(batch == null) new GenerateOffer().run(); - else writerTasks.add(new WriteBatch(batch, requested)); + LOG.info("Generated batch: " + (b != null)); + if(b != null) writerTasks.add(new WriteBatch(b)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -572,12 +555,9 @@ abstract class DuplexConnection implements DatabaseListener { private class WriteBatch implements Runnable { private final Collection<byte[]> batch; - private final Collection<MessageId> requested; - private WriteBatch(Collection<byte[]> batch, - Collection<MessageId> requested) { + private WriteBatch(Collection<byte[]> batch) { this.batch = batch; - this.requested = requested; } public void run() { @@ -585,8 +565,7 @@ abstract class DuplexConnection implements DatabaseListener { try { for(byte[] raw : batch) writer.writeMessage(raw); if(LOG.isLoggable(INFO)) LOG.info("Sent batch"); - if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer()); - else dbExecutor.execute(new GenerateBatches(requested)); + dbExecutor.execute(new GenerateBatch()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -601,11 +580,10 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE); try { - Offer o = db.generateOffer(contactId, maxMessages); + Offer o = db.generateOffer(contactId, maxMessages, maxLatency); if(LOG.isLoggable(INFO)) LOG.info("Generated offer: " + (o != null)); - if(o == null) canSendOffer.set(true); - else writerTasks.add(new WriteOffer(o)); + if(o != null) writerTasks.add(new WriteOffer(o)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -626,6 +604,46 @@ abstract class DuplexConnection implements DatabaseListener { try { writer.writeOffer(offer); if(LOG.isLoggable(INFO)) LOG.info("Sent offer"); + dbExecutor.execute(new GenerateOffer()); + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + dispose(true, true); + } + } + } + + // This task runs on a database thread + private class GenerateRequest implements Runnable { + + public void run() { + assert writer != null; + int maxMessages = writer.getMaxMessagesForRequest(Long.MAX_VALUE); + try { + Request r = db.generateRequest(contactId, maxMessages); + if(LOG.isLoggable(INFO)) + LOG.info("Generated request: " + (r != null)); + if(r != null) writerTasks.add(new WriteRequest(r)); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This task runs on the writer thread + private class WriteRequest implements Runnable { + + private final Request request; + + private WriteRequest(Request request) { + this.request = request; + } + + public void run() { + assert writer != null; + try { + writer.writeRequest(request); + if(LOG.isLoggable(INFO)) LOG.info("Sent request"); + dbExecutor.execute(new GenerateRequest()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -662,6 +680,7 @@ abstract class DuplexConnection implements DatabaseListener { try { writer.writeRetentionAck(ack); if(LOG.isLoggable(INFO)) LOG.info("Sent retention ack"); + dbExecutor.execute(new GenerateRetentionAck()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -699,6 +718,7 @@ abstract class DuplexConnection implements DatabaseListener { try { writer.writeRetentionUpdate(update); if(LOG.isLoggable(INFO)) LOG.info("Sent retention update"); + dbExecutor.execute(new GenerateRetentionUpdate()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -735,6 +755,7 @@ abstract class DuplexConnection implements DatabaseListener { try { writer.writeSubscriptionAck(ack); if(LOG.isLoggable(INFO)) LOG.info("Sent subscription ack"); + dbExecutor.execute(new GenerateSubscriptionAck()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -772,6 +793,7 @@ abstract class DuplexConnection implements DatabaseListener { try { writer.writeSubscriptionUpdate(update); if(LOG.isLoggable(INFO)) LOG.info("Sent subscription update"); + dbExecutor.execute(new GenerateSubscriptionUpdate()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -809,6 +831,7 @@ abstract class DuplexConnection implements DatabaseListener { try { for(TransportAck a : acks) writer.writeTransportAck(a); if(LOG.isLoggable(INFO)) LOG.info("Sent transport acks"); + dbExecutor.execute(new GenerateTransportAcks()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -846,6 +869,7 @@ abstract class DuplexConnection implements DatabaseListener { try { for(TransportUpdate u : updates) writer.writeTransportUpdate(u); if(LOG.isLoggable(INFO)) LOG.info("Sent transport updates"); + dbExecutor.execute(new GenerateTransportUpdates()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); diff --git a/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java b/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java index 8b259c844f1454216a88001ec7224300935bc623..8fc9c2ceddad75e28db43e19698bf48b7c8dfea8 100644 --- a/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java @@ -82,12 +82,12 @@ class OutgoingSimplexConnection { if(hasSpace) hasSpace = writeRetentionUpdate(conn, writer); // Write acks until you can't write acks no more capacity = conn.getRemainingCapacity(); - int maxMessages = writer.getMaxMessagesForAck(capacity); + int maxMessages = writer.getMaxMessagesForRequest(capacity); Ack a = db.generateAck(contactId, maxMessages); while(a != null) { writer.writeAck(a); capacity = conn.getRemainingCapacity(); - maxMessages = writer.getMaxMessagesForAck(capacity); + maxMessages = writer.getMaxMessagesForRequest(capacity); a = db.generateAck(contactId, maxMessages); } // Write messages until you can't write messages no more diff --git a/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java b/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java index 47a9fc7a11b68f35f9167e94fb7c77212cdd5360..ff8a65a2bc0f7f316f8b326d304112e1ec75dfe8 100644 --- a/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java +++ b/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java @@ -3,13 +3,10 @@ package net.sf.briar.db; import static net.sf.briar.api.AuthorConstants.MAX_PUBLIC_KEY_LENGTH; import static net.sf.briar.api.messaging.MessagingConstants.GROUP_SALT_LENGTH; -import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.Map; import net.sf.briar.BriarTestCase; import net.sf.briar.TestMessage; @@ -22,7 +19,6 @@ import net.sf.briar.api.LocalAuthor; import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportId; import net.sf.briar.api.TransportProperties; -import net.sf.briar.api.db.AckAndRequest; import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.NoSuchLocalAuthorException; @@ -34,8 +30,10 @@ import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.db.event.LocalAuthorAddedEvent; import net.sf.briar.api.db.event.LocalAuthorRemovedEvent; import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent; +import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent; import net.sf.briar.api.db.event.MessageAddedEvent; -import net.sf.briar.api.db.event.MessageReceivedEvent; +import net.sf.briar.api.db.event.MessageToAckEvent; +import net.sf.briar.api.db.event.MessageToRequestEvent; import net.sf.briar.api.db.event.SubscriptionAddedEvent; import net.sf.briar.api.db.event.SubscriptionRemovedEvent; import net.sf.briar.api.lifecycle.ShutdownManager; @@ -186,6 +184,8 @@ public abstract class DatabaseComponentTest extends BriarTestCase { // removeContact() oneOf(database).containsContact(txn, contactId); will(returnValue(true)); + oneOf(database).getInboxGroupId(txn, contactId); + will(returnValue(null)); oneOf(database).removeContact(txn, contactId); oneOf(listener).eventOccurred(with(any(ContactRemovedEvent.class))); // removeLocalAuthor() @@ -288,9 +288,13 @@ public abstract class DatabaseComponentTest extends BriarTestCase { will(returnValue(true)); oneOf(database).addMessage(txn, message, false); oneOf(database).setReadFlag(txn, messageId, true); + oneOf(database).getVisibility(txn, groupId); + will(returnValue(Arrays.asList(contactId))); oneOf(database).getContactIds(txn); will(returnValue(Arrays.asList(contactId))); - oneOf(database).addStatus(txn, contactId, messageId, false); + oneOf(database).removeOfferedMessage(txn, contactId, messageId); + will(returnValue(false)); + oneOf(database).addStatus(txn, contactId, messageId, false, false); oneOf(database).commitTransaction(txn); // The message was added, so the listener should be called oneOf(listener).eventOccurred(with(any( @@ -315,11 +319,11 @@ public abstract class DatabaseComponentTest extends BriarTestCase { final ShutdownManager shutdown = context.mock(ShutdownManager.class); context.checking(new Expectations() {{ // Check whether the contact is in the DB (which it's not) - exactly(28).of(database).startTransaction(); + exactly(25).of(database).startTransaction(); will(returnValue(txn)); - exactly(28).of(database).containsContact(txn, contactId); + exactly(25).of(database).containsContact(txn, contactId); will(returnValue(false)); - exactly(28).of(database).abortTransaction(txn); + exactly(25).of(database).abortTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); @@ -329,11 +333,6 @@ public abstract class DatabaseComponentTest extends BriarTestCase { fail(); } catch(NoSuchContactException expected) {} - try { - db.containsSendableMessages(contactId); - fail(); - } catch(NoSuchContactException expected) {} - try { db.generateAck(contactId, 123); fail(); @@ -345,12 +344,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase { } catch(NoSuchContactException expected) {} try { - db.generateBatch(contactId, 123, 456, Arrays.asList(messageId)); - fail(); - } catch(NoSuchContactException expected) {} - - try { - db.generateOffer(contactId, 123); + db.generateOffer(contactId, 123, 456); fail(); } catch(NoSuchContactException expected) {} @@ -469,11 +463,6 @@ public abstract class DatabaseComponentTest extends BriarTestCase { fail(); } catch(NoSuchContactException expected) {} - try { - db.setSeen(contactId, Arrays.asList(messageId)); - fail(); - } catch(NoSuchContactException expected) {} - context.assertIsSatisfied(); } @@ -657,17 +646,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase { final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final ShutdownManager shutdown = context.mock(ShutdownManager.class); context.checking(new Expectations() {{ - // Two transactions: read and write - exactly(2).of(database).startTransaction(); + oneOf(database).startTransaction(); will(returnValue(txn)); - exactly(2).of(database).commitTransaction(txn); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - // Get the messages to ack oneOf(database).getMessagesToAck(txn, contactId, 123); will(returnValue(messagesToAck)); - // Record the messages that were acked - oneOf(database).removeMessagesToAck(txn, contactId, messagesToAck); + oneOf(database).lowerAckFlag(txn, contactId, messagesToAck); + oneOf(database).commitTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); @@ -681,38 +667,30 @@ public abstract class DatabaseComponentTest extends BriarTestCase { @Test public void testGenerateBatch() throws Exception { final byte[] raw1 = new byte[size]; - final Collection<MessageId> sendable = Arrays.asList(messageId, - messageId1); + final Collection<MessageId> ids = Arrays.asList(messageId, messageId1); final Collection<byte[]> messages = Arrays.asList(raw, raw1); - final Map<MessageId, Integer> sent = new HashMap<MessageId, Integer>(); - sent.put(messageId, 1); - sent.put(messageId1, 2); Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final ShutdownManager shutdown = context.mock(ShutdownManager.class); context.checking(new Expectations() {{ - // Two transactions: read and write - exactly(2).of(database).startTransaction(); + oneOf(database).startTransaction(); will(returnValue(txn)); - exactly(2).of(database).commitTransaction(txn); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - // Get the sendable messages and their transmission counts - oneOf(database).getSendableMessages(txn, contactId, size * 2); - will(returnValue(sendable)); + oneOf(database).getMessagesToSend(txn, contactId, size * 2); + will(returnValue(ids)); oneOf(database).getRawMessage(txn, messageId); will(returnValue(raw)); - oneOf(database).getTransmissionCount(txn, contactId, messageId); - will(returnValue(1)); + oneOf(database).updateExpiryTime(txn, contactId, messageId, + Long.MAX_VALUE); oneOf(database).getRawMessage(txn, messageId1); will(returnValue(raw1)); - oneOf(database).getTransmissionCount(txn, contactId, messageId1); - will(returnValue(2)); - // Record the outstanding messages - oneOf(database).updateExpiryTimes(txn, contactId, sent, + oneOf(database).updateExpiryTime(txn, contactId, messageId1, Long.MAX_VALUE); + oneOf(database).lowerRequestedFlag(txn, contactId, ids); + oneOf(database).commitTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); @@ -724,51 +702,40 @@ public abstract class DatabaseComponentTest extends BriarTestCase { } @Test - public void testGenerateBatchFromRequest() throws Exception { - final MessageId messageId2 = new MessageId(TestUtils.getRandomId()); - final byte[] raw1 = new byte[size]; - final Collection<MessageId> requested = new ArrayList<MessageId>( - Arrays.asList(messageId, messageId1, messageId2)); - final Collection<byte[]> messages = Arrays.asList(raw1); + public void testGenerateOffer() throws Exception { + final MessageId messageId1 = new MessageId(TestUtils.getRandomId()); + final Collection<MessageId> ids = Arrays.asList(messageId, messageId1); Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final ShutdownManager shutdown = context.mock(ShutdownManager.class); context.checking(new Expectations() {{ - // Two transactions: read and write - exactly(2).of(database).startTransaction(); + oneOf(database).startTransaction(); will(returnValue(txn)); - exactly(2).of(database).commitTransaction(txn); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - // Try to get the requested messages - oneOf(database).getRawMessageIfSendable(txn, contactId, messageId); - will(returnValue(null)); // Message is not sendable - oneOf(database).getRawMessageIfSendable(txn, contactId, messageId1); - will(returnValue(raw1)); // Message is sendable - oneOf(database).getTransmissionCount(txn, contactId, messageId1); - will(returnValue(2)); - oneOf(database).getRawMessageIfSendable(txn, contactId, messageId2); - will(returnValue(null)); // Message is not sendable - // Mark the message as sent - oneOf(database).updateExpiryTimes(txn, contactId, - Collections.singletonMap(messageId1, 2), Long.MAX_VALUE); + oneOf(database).getMessagesToOffer(txn, contactId, 123); + will(returnValue(ids)); + oneOf(database).updateExpiryTime(txn, contactId, messageId, + Long.MAX_VALUE); + oneOf(database).updateExpiryTime(txn, contactId, messageId1, + Long.MAX_VALUE); + oneOf(database).commitTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); - assertEquals(messages, db.generateBatch(contactId, size * 3, - Long.MAX_VALUE, requested)); + Offer o = db.generateOffer(contactId, 123, Long.MAX_VALUE); + assertEquals(ids, o.getMessageIds()); context.assertIsSatisfied(); } @Test - public void testGenerateOffer() throws Exception { + public void testGenerateRequest() throws Exception { final MessageId messageId1 = new MessageId(TestUtils.getRandomId()); - final Collection<MessageId> messagesToOffer = Arrays.asList(messageId, - messageId1); + final Collection<MessageId> ids = Arrays.asList(messageId, messageId1); Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); @@ -779,16 +746,54 @@ public abstract class DatabaseComponentTest extends BriarTestCase { will(returnValue(txn)); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - // Get the sendable message IDs - oneOf(database).getMessagesToOffer(txn, contactId, 123); - will(returnValue(messagesToOffer)); + oneOf(database).getMessagesToRequest(txn, contactId, 123); + will(returnValue(ids)); + oneOf(database).removeOfferedMessages(txn, contactId, ids); oneOf(database).commitTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); - Offer o = db.generateOffer(contactId, 123); - assertEquals(messagesToOffer, o.getMessageIds()); + Request r = db.generateRequest(contactId, 123); + assertEquals(ids, r.getMessageIds()); + + context.assertIsSatisfied(); + } + + @Test + public void testGenerateRequestedBatch() throws Exception { + final byte[] raw1 = new byte[size]; + final Collection<MessageId> ids = Arrays.asList(messageId, messageId1); + final Collection<byte[]> messages = Arrays.asList(raw, raw1); + Mockery context = new Mockery(); + @SuppressWarnings("unchecked") + final Database<Object> database = context.mock(Database.class); + final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); + final ShutdownManager shutdown = context.mock(ShutdownManager.class); + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).getRequestedMessagesToSend(txn, contactId, + size * 2); + will(returnValue(ids)); + oneOf(database).getRawMessage(txn, messageId); + will(returnValue(raw)); + oneOf(database).updateExpiryTime(txn, contactId, messageId, + Long.MAX_VALUE); + oneOf(database).getRawMessage(txn, messageId1); + will(returnValue(raw1)); + oneOf(database).updateExpiryTime(txn, contactId, messageId1, + Long.MAX_VALUE); + oneOf(database).lowerRequestedFlag(txn, contactId, ids); + oneOf(database).commitTransaction(txn); + }}); + DatabaseComponent db = createDatabaseComponent(database, cleaner, + shutdown); + + assertEquals(messages, db.generateRequestedBatch(contactId, size * 2, + Long.MAX_VALUE)); context.assertIsSatisfied(); } @@ -965,7 +970,9 @@ public abstract class DatabaseComponentTest extends BriarTestCase { will(returnValue(txn)); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).setStatusSeenIfVisible(txn, contactId, messageId); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).raiseSeenFlag(txn, contactId, messageId); oneOf(database).commitTransaction(txn); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, @@ -994,14 +1001,18 @@ public abstract class DatabaseComponentTest extends BriarTestCase { oneOf(database).containsVisibleGroup(txn, contactId, groupId); will(returnValue(true)); oneOf(database).addMessage(txn, message, true); - oneOf(database).addStatus(txn, contactId, messageId, true); + oneOf(database).getVisibility(txn, groupId); + will(returnValue(Arrays.asList(contactId))); oneOf(database).getContactIds(txn); will(returnValue(Arrays.asList(contactId))); - oneOf(database).addMessageToAck(txn, contactId, messageId); + oneOf(database).removeOfferedMessage(txn, contactId, messageId); + will(returnValue(false)); + oneOf(database).addStatus(txn, contactId, messageId, false, true); + oneOf(database).raiseAckFlag(txn, contactId, messageId); oneOf(database).commitTransaction(txn); // The message was received and added oneOf(listener).eventOccurred(with(any( - MessageReceivedEvent.class))); + MessageToAckEvent.class))); oneOf(listener).eventOccurred(with(any(MessageAddedEvent.class))); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, @@ -1031,11 +1042,11 @@ public abstract class DatabaseComponentTest extends BriarTestCase { oneOf(database).containsVisibleGroup(txn, contactId, groupId); will(returnValue(true)); // The message wasn't stored but it must still be acked - oneOf(database).addMessageToAck(txn, contactId, messageId); + oneOf(database).raiseAckFlag(txn, contactId, messageId); oneOf(database).commitTransaction(txn); // The message was received but not added oneOf(listener).eventOccurred(with(any( - MessageReceivedEvent.class))); + MessageToAckEvent.class))); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); @@ -1084,31 +1095,58 @@ public abstract class DatabaseComponentTest extends BriarTestCase { final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final ShutdownManager shutdown = context.mock(ShutdownManager.class); + final DatabaseListener listener = context.mock(DatabaseListener.class); context.checking(new Expectations() {{ oneOf(database).startTransaction(); will(returnValue(txn)); oneOf(database).containsContact(txn, contactId); will(returnValue(true)); - // Get the offered messages - oneOf(database).setStatusSeenIfVisible(txn, contactId, messageId); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); will(returnValue(false)); // Not visible - request message # 0 - oneOf(database).setStatusSeenIfVisible(txn, contactId, messageId1); + oneOf(database).addOfferedMessage(txn, contactId, messageId); + oneOf(database).containsVisibleMessage(txn, contactId, messageId1); will(returnValue(true)); // Visible - ack message # 1 - oneOf(database).setStatusSeenIfVisible(txn, contactId, messageId2); + oneOf(database).raiseSeenFlag(txn, contactId, messageId1); + oneOf(database).raiseAckFlag(txn, contactId, messageId1); + oneOf(database).containsVisibleMessage(txn, contactId, messageId2); will(returnValue(false)); // Not visible - request message # 2 + oneOf(database).addOfferedMessage(txn, contactId, messageId2); oneOf(database).commitTransaction(txn); + oneOf(listener).eventOccurred(with(any(MessageToAckEvent.class))); + oneOf(listener).eventOccurred(with(any( + MessageToRequestEvent.class))); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); + db.addListener(listener); Offer o = new Offer(Arrays.asList(messageId, messageId1, messageId2)); - AckAndRequest ar = db.receiveOffer(contactId, o); - Ack a = ar.getAck(); - assertNotNull(a); - assertEquals(Arrays.asList(messageId1), a.getMessageIds()); - Request r = ar.getRequest(); - assertNotNull(r); - assertEquals(Arrays.asList(messageId, messageId2), r.getMessageIds()); + db.receiveOffer(contactId, o); + context.assertIsSatisfied(); + } + + @Test + public void testReceiveRequest() throws Exception { + Mockery context = new Mockery(); + @SuppressWarnings("unchecked") + final Database<Object> database = context.mock(Database.class); + final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); + final ShutdownManager shutdown = context.mock(ShutdownManager.class); + context.checking(new Expectations() {{ + oneOf(database).startTransaction(); + will(returnValue(txn)); + oneOf(database).containsContact(txn, contactId); + will(returnValue(true)); + oneOf(database).containsVisibleMessage(txn, contactId, messageId); + will(returnValue(true)); + oneOf(database).raiseRequestedFlag(txn, contactId, messageId); + oneOf(database).resetExpiryTime(txn, contactId, messageId); + oneOf(database).commitTransaction(txn); + }}); + DatabaseComponent db = createDatabaseComponent(database, cleaner, + shutdown); + + db.receiveRequest(contactId, new Request(Arrays.asList(messageId))); context.assertIsSatisfied(); } @@ -1248,6 +1286,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase { final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final ShutdownManager shutdown = context.mock(ShutdownManager.class); + final DatabaseListener listener = context.mock(DatabaseListener.class); context.checking(new Expectations() {{ oneOf(database).startTransaction(); will(returnValue(txn)); @@ -1257,10 +1296,13 @@ public abstract class DatabaseComponentTest extends BriarTestCase { will(returnValue(new TransportProperties())); oneOf(database).mergeLocalProperties(txn, transportId, properties); oneOf(database).commitTransaction(txn); + oneOf(listener).eventOccurred(with(any( + LocalTransportsUpdatedEvent.class))); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); + db.addListener(listener); db.mergeLocalProperties(transportId, properties); context.assertIsSatisfied(); @@ -1295,29 +1337,6 @@ public abstract class DatabaseComponentTest extends BriarTestCase { context.assertIsSatisfied(); } - @Test - public void testSetSeen() throws Exception { - Mockery context = new Mockery(); - @SuppressWarnings("unchecked") - final Database<Object> database = context.mock(Database.class); - final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); - final ShutdownManager shutdown = context.mock(ShutdownManager.class); - context.checking(new Expectations() {{ - oneOf(database).startTransaction(); - will(returnValue(txn)); - oneOf(database).containsContact(txn, contactId); - will(returnValue(true)); - oneOf(database).setStatusSeenIfVisible(txn, contactId, messageId); - oneOf(database).commitTransaction(txn); - }}); - DatabaseComponent db = createDatabaseComponent(database, cleaner, - shutdown); - - db.setSeen(contactId, Arrays.asList(messageId)); - - context.assertIsSatisfied(); - } - @Test public void testChangingVisibilityCallsListeners() throws Exception { final ContactId contactId1 = new ContactId(123); diff --git a/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java b/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java index 99c74b2ade651189463057a1b138a47542eb9545..b7b6b5c29c722049491112ab94dc9558d99892e4 100644 --- a/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java +++ b/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java @@ -171,24 +171,23 @@ public class H2DatabaseTest extends BriarTestCase { db.addMessage(txn, message, false); // The message has no status yet, so it should not be sendable - assertFalse(db.containsSendableMessages(txn, contactId)); - Iterator<MessageId> it = - db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); - assertFalse(it.hasNext()); + Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, + ONE_MEGABYTE); + assertTrue(ids.isEmpty()); // Adding a status with seen = false should make the message sendable - db.addStatus(txn, contactId, messageId, false); - assertTrue(db.containsSendableMessages(txn, contactId)); - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); + db.addStatus(txn, contactId, messageId, false, false); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + assertFalse(ids.isEmpty()); + Iterator<MessageId> it = ids.iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); // Changing the status to seen = true should make the message unsendable - db.setStatusSeenIfVisible(txn, contactId, messageId); - assertFalse(db.containsSendableMessages(txn, contactId)); - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); - assertFalse(it.hasNext()); + db.raiseSeenFlag(txn, contactId, messageId); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + assertTrue(ids.isEmpty()); db.commitTransaction(txn); db.close(); @@ -205,27 +204,26 @@ public class H2DatabaseTest extends BriarTestCase { db.addGroup(txn, group); db.addVisibility(txn, contactId, groupId); db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, false); + db.addStatus(txn, contactId, messageId, false, false); // The contact is not subscribed, so the message should not be sendable - assertFalse(db.containsSendableMessages(txn, contactId)); - Iterator<MessageId> it = - db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); - assertFalse(it.hasNext()); + Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, + ONE_MEGABYTE); + assertTrue(ids.isEmpty()); // The contact subscribing should make the message sendable db.setGroups(txn, contactId, Arrays.asList(group), 1); - assertTrue(db.containsSendableMessages(txn, contactId)); - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + assertFalse(ids.isEmpty()); + Iterator<MessageId> it = ids.iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); // The contact unsubscribing should make the message unsendable db.setGroups(txn, contactId, Collections.<Group>emptyList(), 2); - assertFalse(db.containsSendableMessages(txn, contactId)); - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); - assertFalse(it.hasNext()); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + assertTrue(ids.isEmpty()); db.commitTransaction(txn); db.close(); @@ -243,17 +241,17 @@ public class H2DatabaseTest extends BriarTestCase { db.addVisibility(txn, contactId, groupId); db.setGroups(txn, contactId, Arrays.asList(group), 1); db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, false); + db.addStatus(txn, contactId, messageId, false, false); // The message is sendable, but too large to send - assertTrue(db.containsSendableMessages(txn, contactId)); - Iterator<MessageId> it = - db.getSendableMessages(txn, contactId, size - 1).iterator(); - assertFalse(it.hasNext()); + Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, + size - 1); + assertTrue(ids.isEmpty()); // The message is just the right size to send - assertTrue(db.containsSendableMessages(txn, contactId)); - it = db.getSendableMessages(txn, contactId, size).iterator(); + ids = db.getMessagesToSend(txn, contactId, size); + assertFalse(ids.isEmpty()); + Iterator<MessageId> it = ids.iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); @@ -273,19 +271,19 @@ public class H2DatabaseTest extends BriarTestCase { db.addGroup(txn, group); db.setGroups(txn, contactId, Arrays.asList(group), 1); db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, false); + db.addStatus(txn, contactId, messageId, false, false); // The subscription is not visible to the contact, so the message // should not be sendable - assertFalse(db.containsSendableMessages(txn, contactId)); - Iterator<MessageId> it = - db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); - assertFalse(it.hasNext()); + Collection<MessageId> ids = db.getMessagesToSend(txn, contactId, + ONE_MEGABYTE); + assertTrue(ids.isEmpty()); // Making the subscription visible should make the message sendable db.addVisibility(txn, contactId, groupId); - assertTrue(db.containsSendableMessages(txn, contactId)); - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); + ids = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE); + assertFalse(ids.isEmpty()); + Iterator<MessageId> it = ids.iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); @@ -310,18 +308,18 @@ public class H2DatabaseTest extends BriarTestCase { Message message1 = new TestMessage(messageId1, null, group, author, contentType, subject, timestamp, raw); db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, true); - db.addMessageToAck(txn, contactId, messageId); + db.addStatus(txn, contactId, messageId, false, true); + db.raiseAckFlag(txn, contactId, messageId); db.addMessage(txn, message1, false); - db.addStatus(txn, contactId, messageId1, true); - db.addMessageToAck(txn, contactId, messageId1); + db.addStatus(txn, contactId, messageId1, false, true); + db.raiseAckFlag(txn, contactId, messageId1); // Both message IDs should be returned Collection<MessageId> ids = Arrays.asList(messageId, messageId1); assertEquals(ids, db.getMessagesToAck(txn, contactId, 1234)); // Remove both message IDs - db.removeMessagesToAck(txn, contactId, ids); + db.lowerAckFlag(txn, contactId, Arrays.asList(messageId, messageId1)); // Both message IDs should have been removed assertEquals(Collections.emptyList(), db.getMessagesToAck(txn, @@ -344,16 +342,16 @@ public class H2DatabaseTest extends BriarTestCase { // Receive the same message twice db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, true); - db.addMessageToAck(txn, contactId, messageId); - db.addMessageToAck(txn, contactId, messageId); + db.addStatus(txn, contactId, messageId, false, true); + db.raiseAckFlag(txn, contactId, messageId); + db.raiseAckFlag(txn, contactId, messageId); // The message ID should only be returned once Collection<MessageId> ids = db.getMessagesToAck(txn, contactId, 1234); assertEquals(Arrays.asList(messageId), ids); // Remove the message ID - db.removeMessagesToAck(txn, contactId, Arrays.asList(messageId)); + db.lowerAckFlag(txn, contactId, Arrays.asList(messageId)); // The message ID should have been removed assertEquals(Collections.emptyList(), db.getMessagesToAck(txn, @@ -375,26 +373,25 @@ public class H2DatabaseTest extends BriarTestCase { db.addVisibility(txn, contactId, groupId); db.setGroups(txn, contactId, Arrays.asList(group), 1); db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, false); + db.addStatus(txn, contactId, messageId, false, false); // Retrieve the message from the database and mark it as sent Iterator<MessageId> it = - db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); + db.getMessagesToSend(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); - db.updateExpiryTimes(txn, contactId, - Collections.singletonMap(messageId, 0), Long.MAX_VALUE); + db.updateExpiryTime(txn, contactId, messageId, Long.MAX_VALUE); // The message should no longer be sendable - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); + it = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); // Pretend that the message was acked - db.setStatusSeenIfVisible(txn, contactId, messageId); + db.raiseSeenFlag(txn, contactId, messageId); // The message still should not be sendable - it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); + it = db.getMessagesToSend(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); db.commitTransaction(txn); @@ -661,7 +658,7 @@ public class H2DatabaseTest extends BriarTestCase { } @Test - public void testGetMessageIfSendableReturnsNullIfNotInDatabase() + public void testContainsVisibleMessageRequiresMessageInDatabase() throws Exception { Database<Connection> db = open(false); Connection txn = db.startTransaction(); @@ -674,106 +671,14 @@ public class H2DatabaseTest extends BriarTestCase { db.setGroups(txn, contactId, Arrays.asList(group), 1); // The message is not in the database - assertNull(db.getRawMessageIfSendable(txn, contactId, messageId)); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testGetMessageIfSendableReturnsNullIfSeen() throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact, subscribe to a group and store a message - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.setGroups(txn, contactId, Arrays.asList(group), 1); - db.addMessage(txn, message, false); - - // Set the status to seen = true - db.addStatus(txn, contactId, messageId, true); - - // The message is not sendable because its status is seen = true - assertNull(db.getRawMessageIfSendable(txn, contactId, messageId)); + assertFalse(db.containsVisibleMessage(txn, contactId, messageId)); db.commitTransaction(txn); db.close(); } @Test - public void testGetMessageIfSendableReturnsNullIfOld() throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact, subscribe to a group and store a message - - // the message is older than the contact's retention time - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.setGroups(txn, contactId, Arrays.asList(group), 1); - db.setRetentionTime(txn, contactId, timestamp + 1, 1); - db.addMessage(txn, message, false); - - // Set the status to seen = false - db.addStatus(txn, contactId, messageId, false); - - // The message is not sendable because it's too old - assertNull(db.getRawMessageIfSendable(txn, contactId, messageId)); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testGetMessageIfSendableReturnsMessage() throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact, subscribe to a group and store a message - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.setGroups(txn, contactId, Arrays.asList(group), 1); - db.addMessage(txn, message, false); - - // Set the status to seen = false - db.addStatus(txn, contactId, messageId, false); - - // The message is sendable so it should be returned - byte[] b = db.getRawMessageIfSendable(txn, contactId, messageId); - assertArrayEquals(raw, b); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testSetStatusSeenIfVisibleRequiresMessageInDatabase() - throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact and subscribe to a group - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.setGroups(txn, contactId, Arrays.asList(group), 1); - - // The message is not in the database - assertFalse(db.setStatusSeenIfVisible(txn, contactId, messageId)); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testSetStatusSeenIfVisibleRequiresLocalSubscription() + public void testContainsVisibleMessageRequiresLocalSubscription() throws Exception { Database<Connection> db = open(false); Connection txn = db.startTransaction(); @@ -784,35 +689,14 @@ public class H2DatabaseTest extends BriarTestCase { db.setGroups(txn, contactId, Arrays.asList(group), 1); // There's no local subscription for the group - assertFalse(db.setStatusSeenIfVisible(txn, contactId, messageId)); + assertFalse(db.containsVisibleMessage(txn, contactId, messageId)); db.commitTransaction(txn); db.close(); } @Test - public void testSetStatusSeenIfVisibleRequiresContactSubscription() - throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact, subscribe to a group and store a message - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, false); - - // There's no contact subscription for the group - assertFalse(db.setStatusSeenIfVisible(txn, contactId, messageId)); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testSetStatusSeenIfVisibleRequiresVisibility() + public void testContainsVisibleMessageRequiresVisibileSubscription() throws Exception { Database<Connection> db = open(false); Connection txn = db.startTransaction(); @@ -823,56 +707,10 @@ public class H2DatabaseTest extends BriarTestCase { db.addGroup(txn, group); db.setGroups(txn, contactId, Arrays.asList(group), 1); db.addMessage(txn, message, false); - db.addStatus(txn, contactId, messageId, false); + db.addStatus(txn, contactId, messageId, false, false); // The subscription is not visible - assertFalse(db.setStatusSeenIfVisible(txn, contactId, messageId)); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testSetStatusSeenIfVisibleReturnsTrueIfAlreadySeen() - throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact, subscribe to a group and store a message - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.setGroups(txn, contactId, Arrays.asList(group), 1); - db.addMessage(txn, message, false); - - // The message has already been seen by the contact - db.addStatus(txn, contactId, messageId, true); - - assertTrue(db.setStatusSeenIfVisible(txn, contactId, messageId)); - - db.commitTransaction(txn); - db.close(); - } - - @Test - public void testSetStatusSeenIfVisibleReturnsTrueIfNew() - throws Exception { - Database<Connection> db = open(false); - Connection txn = db.startTransaction(); - - // Add a contact, subscribe to a group and store a message - db.addLocalAuthor(txn, localAuthor); - assertEquals(contactId, db.addContact(txn, author, localAuthorId)); - db.addGroup(txn, group); - db.addVisibility(txn, contactId, groupId); - db.setGroups(txn, contactId, Arrays.asList(group), 1); - db.addMessage(txn, message, false); - - // The message has not been seen by the contact - db.addStatus(txn, contactId, messageId, false); - - assertTrue(db.setStatusSeenIfVisible(txn, contactId, messageId)); + assertFalse(db.containsVisibleMessage(txn, contactId, messageId)); db.commitTransaction(txn); db.close(); @@ -946,8 +784,7 @@ public class H2DatabaseTest extends BriarTestCase { } @Test - public void testGetParentWithParentInAnotherGroup() - throws Exception { + public void testGetParentWithParentInAnotherGroup() throws Exception { GroupId groupId1 = new GroupId(TestUtils.getRandomId()); Group group1 = new Group(groupId1, "Another group", new byte[GROUP_SALT_LENGTH]); @@ -976,8 +813,7 @@ public class H2DatabaseTest extends BriarTestCase { } @Test - public void testGetParentWithParentInSameGroup() - throws Exception { + public void testGetParentWithParentInSameGroup() throws Exception { Database<Connection> db = open(false); Connection txn = db.startTransaction(); @@ -1060,11 +896,10 @@ public class H2DatabaseTest extends BriarTestCase { contentType, subject, timestamp1, raw); db.addMessage(txn, message1, false); // Mark one of the messages read - assertFalse(db.setReadFlag(txn, messageId, true)); + db.setReadFlag(txn, messageId, true); // Retrieve the message headers - Collection<MessageHeader> headers = - db.getMessageHeaders(txn, groupId); + Collection<MessageHeader> headers = db.getMessageHeaders(txn, groupId); Iterator<MessageHeader> it = headers.iterator(); boolean messageFound = false, message1Found = false; // First header (order is undefined) @@ -1126,16 +961,14 @@ public class H2DatabaseTest extends BriarTestCase { // The message should be unread by default assertFalse(db.getReadFlag(txn, messageId)); - // Marking the message read should return the old value - assertFalse(db.setReadFlag(txn, messageId, true)); - assertTrue(db.setReadFlag(txn, messageId, true)); + // Mark the message read + db.setReadFlag(txn, messageId, true); // The message should be read assertTrue(db.getReadFlag(txn, messageId)); - // Marking the message unread should return the old value - assertTrue(db.setReadFlag(txn, messageId, false)); - assertFalse(db.setReadFlag(txn, messageId, false)); - // Unsubscribe from the group - db.removeGroup(txn, groupId); + // Mark the message unread + db.setReadFlag(txn, messageId, false); + // The message should be unread + assertFalse(db.getReadFlag(txn, messageId)); db.commitTransaction(txn); db.close(); @@ -1167,7 +1000,7 @@ public class H2DatabaseTest extends BriarTestCase { db.addMessage(txn, message2, false); // Mark one of the messages in the first group read - assertFalse(db.setReadFlag(txn, messageId, true)); + db.setReadFlag(txn, messageId, true); // There should be one unread message in each group Map<GroupId, Integer> counts = db.getUnreadMessageCounts(txn); @@ -1179,11 +1012,11 @@ public class H2DatabaseTest extends BriarTestCase { assertNotNull(count); assertEquals(1, count.intValue()); - // Mark the read message unread (it will now be false rather than null) - assertTrue(db.setReadFlag(txn, messageId, false)); + // Mark the read message unread + db.setReadFlag(txn, messageId, false); // Mark the message in the second group read - assertFalse(db.setReadFlag(txn, messageId2, true)); + db.setReadFlag(txn, messageId2, true); // There should be two unread messages in the first group, none in // the second group diff --git a/briar-tests/src/net/sf/briar/messaging/ConstantsTest.java b/briar-tests/src/net/sf/briar/messaging/ConstantsTest.java index f94873d7f35d5c06d55f4c61d413929d436681d7..9968be2855742bb7ef0e7809acf9ed3b38c65259 100644 --- a/briar-tests/src/net/sf/briar/messaging/ConstantsTest.java +++ b/briar-tests/src/net/sf/briar/messaging/ConstantsTest.java @@ -196,7 +196,7 @@ public class ConstantsTest extends BriarTestCase { // Create an ack with as many message IDs as possible ByteArrayOutputStream out = new ByteArrayOutputStream(length); PacketWriter writer = packetWriterFactory.createPacketWriter(out, true); - int maxMessages = writer.getMaxMessagesForAck(length); + int maxMessages = writer.getMaxMessagesForRequest(length); Collection<MessageId> acked = new ArrayList<MessageId>(); for(int i = 0; i < maxMessages; i++) acked.add(new MessageId(TestUtils.getRandomId()));