diff --git a/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java b/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java index 993dea7785070cf677fd44f8c80d249d8552a30f..31449e3c611df39e71dd85cbd87c118bf175083e 100644 --- a/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java +++ b/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java @@ -113,10 +113,12 @@ public interface DatabaseComponent { RetentionAck generateRetentionAck(ContactId c) throws DbException; /** - * Generates a retention update for the given contact. Returns null if no - * update is due. + * Generates a retention update for the given contact, for transmission + * over a transport with the given latency. Returns null if no update is + * due. */ - RetentionUpdate generateRetentionUpdate(ContactId c) throws DbException; + RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency) + throws DbException; /** * Generates a subscription ack for the given contact. Returns null if no @@ -125,10 +127,11 @@ public interface DatabaseComponent { SubscriptionAck generateSubscriptionAck(ContactId c) throws DbException; /** - * Generates a subscription update for the given contact. Returns null if - * no update is due. + * Generates a subscription update for the given contact, for transmission + * over a transport with the given latency. Returns null if no update is + * due. */ - SubscriptionUpdate generateSubscriptionUpdate(ContactId c) + SubscriptionUpdate generateSubscriptionUpdate(ContactId c, long maxLatency) throws DbException; /** @@ -139,11 +142,12 @@ public interface DatabaseComponent { throws DbException; /** - * Generates a batch of transport updates for the given contact. Returns - * null if no updates are due. + * Generates a batch of transport updates for the given contact, for + * transmission over a transport with the given latency. Returns null if no + * updates are due. */ - Collection<TransportUpdate> generateTransportUpdates(ContactId c) - throws DbException; + Collection<TransportUpdate> generateTransportUpdates(ContactId c, + long maxLatency) throws DbException; /** Returns the configuration for the given transport. */ TransportConfig getConfig(TransportId t) throws DbException; diff --git a/briar-core/src/net/sf/briar/db/Database.java b/briar-core/src/net/sf/briar/db/Database.java index ca5c732a9beb089047fcb8084e2cf0621961a0df..0333dcfa60faf670862d55b13d065d1518c9d6e1 100644 --- a/briar-core/src/net/sf/briar/db/Database.java +++ b/briar-core/src/net/sf/briar/db/Database.java @@ -25,6 +25,8 @@ import net.sf.briar.api.messaging.TransportUpdate; import net.sf.briar.api.transport.Endpoint; import net.sf.briar.api.transport.TemporarySecret; +// FIXME: Document the preconditions for calling each method + /** * A low-level interface to the database (DatabaseComponent provides a * high-level interface). Most operations take a transaction argument, which is @@ -103,15 +105,6 @@ interface Database<T> { */ void addMessageToAck(T txn, ContactId c, MessageId m) throws DbException; - /** - * Records the given messages as needing to be acknowledged by the given - * expiry time. - * <p> - * Locking: contact read, message write. - */ - void addOutstandingMessages(T txn, ContactId c, Collection<MessageId> sent, - long expiry) throws DbException; - /** * Stores the given message, or returns false if the message is already in * the database. @@ -308,9 +301,9 @@ interface Database<T> { byte[] getRawMessage(T txn, MessageId m) throws DbException; /** - * Returns the message identified by the given ID, in serialised form, or - * null if the message is not present in the database or is not sendable to - * the given contact. + * Returns the message identified by the given ID, in serialised form. + * Returns null if the message is not present in the database or is not + * sendable to the given contact. * <p> * Locking: contact read, message read, subscription read. */ @@ -355,12 +348,13 @@ interface Database<T> { RetentionAck getRetentionAck(T txn, ContactId c) throws DbException; /** - * Returns a retention update for the given contact, or null if no update - * is due. + * Returns a retention update for the given contact and updates its expiry + * time using the given latency. Returns null if no update is due. * <p> * Locking: contact read, retention write. */ - RetentionUpdate getRetentionUpdate(T txn, ContactId c) throws DbException; + RetentionUpdate getRetentionUpdate(T txn, ContactId c, long maxLatency) + throws DbException; /** * Returns all temporary secrets. @@ -416,13 +410,13 @@ interface Database<T> { SubscriptionAck getSubscriptionAck(T txn, ContactId c) throws DbException; /** - * Returns a subscription update for the given contact, or null if no - * update is due. + * Returns a subscription update for the given contact and updates its + * expiry time using the given latency. Returns null if no update is due. * <p> * Locking: contact read, subscription write. */ - SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c) - throws DbException; + SubscriptionUpdate getSubscriptionUpdate(T txn, ContactId c, + long maxLatency) throws DbException; /** * Returns a collection of transport acks for the given contact, or null if @@ -434,13 +428,14 @@ interface Database<T> { throws DbException; /** - * Returns a collection of transport updates for the given contact, or - * null if no updates are due. + * Returns a collection of transport updates for the given contact and + * updates their expiry times using the given latency. Returns null if no + * updates are due. * <p> * Locking: contact read, transport write. */ - Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c) - throws DbException; + Collection<TransportUpdate> getTransportUpdates(T txn, ContactId c, + long maxLatency) throws DbException; /** * Returns the version number of the @@ -572,6 +567,15 @@ interface Database<T> { void setConnectionWindow(T txn, ContactId c, TransportId t, long period, long centre, byte[] bitmap) throws DbException; + /** + * Updates the expiry times of the given messages with respect to the given + * contact, using the latency of the transport over which they were sent. + * <p> + * Locking: contact read, message write. + */ + void setMessageExpiry(T txn, ContactId c, Collection<MessageId> sent, + long maxLatency) throws DbException; + /** * Sets the user's rating for the given author. * <p> diff --git a/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java b/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java index eeddfb96b4c594af83ab71b735f34c4a4c1f95c8..1338c8c33a79891152ee08e36f23524aa969cb72 100644 --- a/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java +++ b/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java @@ -495,7 +495,7 @@ DatabaseCleaner.Callback { // Get some sendable messages from the database contactLock.readLock().lock(); try { - messageLock.readLock().lock(); + messageLock.writeLock().lock(); try { subscriptionLock.readLock().lock(); try { @@ -515,17 +515,15 @@ DatabaseCleaner.Callback { subscriptionLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + messageLock.writeLock().unlock(); } if(messages.isEmpty()) return null; - // Calculate the expiry time of the messages - long expiry = calculateExpiryTime(maxLatency); // Record the messages as sent messageLock.writeLock().lock(); try { T txn = db.startTransaction(); try { - db.addOutstandingMessages(txn, c, ids, expiry); + db.setMessageExpiry(txn, c, ids, maxLatency); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -580,14 +578,12 @@ DatabaseCleaner.Callback { messageLock.readLock().unlock(); } if(messages.isEmpty()) return null; - // Calculate the expiry times of the messages - long expiry = calculateExpiryTime(maxLatency); // Record the messages as sent messageLock.writeLock().lock(); try { T txn = db.startTransaction(); try { - db.addOutstandingMessages(txn, c, ids, expiry); + db.setMessageExpiry(txn, c, ids, maxLatency); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -602,14 +598,6 @@ DatabaseCleaner.Callback { return Collections.unmodifiableList(messages); } - private long calculateExpiryTime(long maxLatency) { - long roundTrip = maxLatency * 2; - if(roundTrip < 0) roundTrip = Long.MAX_VALUE; // Overflow - long expiry = clock.currentTimeMillis() + roundTrip; - if(expiry < 0) expiry = Long.MAX_VALUE; // Overflow - return expiry; - } - public Offer generateOffer(ContactId c, int maxMessages) throws DbException { Collection<MessageId> offered; @@ -660,7 +648,7 @@ DatabaseCleaner.Callback { } } - public RetentionUpdate generateRetentionUpdate(ContactId c) + public RetentionUpdate generateRetentionUpdate(ContactId c, long maxLatency) throws DbException { contactLock.readLock().lock(); try { @@ -670,7 +658,8 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - RetentionUpdate u = db.getRetentionUpdate(txn, c); + RetentionUpdate u = + db.getRetentionUpdate(txn, c, maxLatency); db.commitTransaction(txn); return u; } catch(DbException e) { @@ -710,8 +699,8 @@ DatabaseCleaner.Callback { } } - public SubscriptionUpdate generateSubscriptionUpdate(ContactId c) - throws DbException { + public SubscriptionUpdate generateSubscriptionUpdate(ContactId c, + long maxLatency) throws DbException { contactLock.readLock().lock(); try { subscriptionLock.writeLock().lock(); @@ -720,7 +709,8 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - SubscriptionUpdate u = db.getSubscriptionUpdate(txn, c); + SubscriptionUpdate u = + db.getSubscriptionUpdate(txn, c, maxLatency); db.commitTransaction(txn); return u; } catch(DbException e) { @@ -760,8 +750,8 @@ DatabaseCleaner.Callback { } } - public Collection<TransportUpdate> generateTransportUpdates(ContactId c) - throws DbException { + public Collection<TransportUpdate> generateTransportUpdates(ContactId c, + long maxLatency) throws DbException { contactLock.readLock().lock(); try { transportLock.writeLock().lock(); @@ -771,7 +761,7 @@ DatabaseCleaner.Callback { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); Collection<TransportUpdate> updates = - db.getTransportUpdates(txn, c); + db.getTransportUpdates(txn, c, maxLatency); db.commitTransaction(txn); return updates; } catch(DbException e) { diff --git a/briar-core/src/net/sf/briar/db/JdbcDatabase.java b/briar-core/src/net/sf/briar/db/JdbcDatabase.java index 99970f9ab9d5f763de93a008537c79286987929b..2a3d215cf97db2b824412166099fd2735d495913 100644 --- a/briar-core/src/net/sf/briar/db/JdbcDatabase.java +++ b/briar-core/src/net/sf/briar/db/JdbcDatabase.java @@ -69,7 +69,7 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, subscription private static final String CREATE_GROUP_VISIBILITIES = "CREATE TABLE groupVisibilities" - + " (contactId INT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " groupId HASH NOT NULL," + " FOREIGN KEY (contactId)" + " REFERENCES contacts (contactId)" @@ -81,7 +81,7 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, subscription private static final String CREATE_CONTACT_GROUPS = "CREATE TABLE contactGroups" - + " (contactId INT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " groupId HASH NOT NULL," // Not a foreign key + " name VARCHAR NOT NULL," + " key BINARY," // Null for unrestricted groups @@ -93,11 +93,12 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, subscription private static final String CREATE_GROUP_VERSIONS = "CREATE TABLE groupVersions" - + " (contactId INT NOT NULL," - + " localVersion BIGINT NOT NULL," - + " localAcked BIGINT NOT NULL," - + " remoteVersion BIGINT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + + " localVersion BIGINT UNSIGNED NOT NULL," + + " localAcked BIGINT UNSIGNED NOT NULL," + + " remoteVersion BIGINT UNSIGNED NOT NULL," + " remoteAcked BOOLEAN NOT NULL," + + " expiry BIGINT UNSIGNED NOT NULL," + " PRIMARY KEY (contactId)," + " FOREIGN KEY (contactid)" + " REFERENCES contacts (contactId)" @@ -111,13 +112,13 @@ abstract class JdbcDatabase implements Database<Connection> { + " groupId HASH," // Null for private messages + " authorId HASH," // Null for private or anonymous msgs + " subject VARCHAR NOT NULL," - + " timestamp BIGINT NOT NULL," - + " length INT NOT NULL," - + " bodyStart INT NOT NULL," - + " bodyLength INT NOT NULL," + + " timestamp BIGINT UNSIGNED NOT NULL," + + " length INT UNSIGNED NOT NULL," + + " bodyStart INT UNSIGNED NOT NULL," + + " bodyLength INT UNSIGNED NOT NULL," + " raw BLOB NOT NULL," - + " sendability INT," // Null for private messages - + " contactId INT," // Null for group messages + + " sendability INT UNSIGNED," // Null for private messages + + " contactId INT UNSIGNED," // Null for group messages + " read BOOLEAN NOT NULL," + " starred BOOLEAN NOT NULL," + " PRIMARY KEY (messageId)," @@ -144,7 +145,7 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_MESSAGES_TO_ACK = "CREATE TABLE messagesToAck" + " (messageId HASH NOT NULL," - + " contactId INT NOT NULL," + + " contactId INT UNSIGNED NOT NULL," + " PRIMARY KEY (messageId, contactId)," + " FOREIGN KEY (contactId)" + " REFERENCES contacts (contactId)" @@ -154,9 +155,9 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_STATUSES = "CREATE TABLE statuses" + " (messageId HASH NOT NULL," - + " contactId INT NOT NULL," + + " contactId INT UNSIGNED NOT NULL," + " seen BOOLEAN NOT NULL," - + " expiry BIGINT NOT NULL," + + " expiry BIGINT UNSIGNED NOT NULL," + " PRIMARY KEY (messageId, contactId)," + " FOREIGN KEY (messageId)" + " REFERENCES messages (messageId)" @@ -181,12 +182,13 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, retention private static final String CREATE_RETENTION_VERSIONS = "CREATE TABLE retentionVersions" - + " (contactId INT NOT NULL," - + " retention BIGINT NOT NULL," - + " localVersion BIGINT NOT NULL," - + " localAcked BIGINT NOT NULL," - + " remoteVersion BIGINT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + + " retention BIGINT UNSIGNED NOT NULL," + + " localVersion BIGINT UNSIGNED NOT NULL," + + " localAcked BIGINT UNSIGNED NOT NULL," + + " remoteVersion BIGINT UNSIGNED NOT NULL," + " remoteAcked BOOLEAN NOT NULL," + + " expiry BIGINT UNSIGNED NOT NULL," + " PRIMARY KEY (contactId)," + " FOREIGN KEY (contactId)" + " REFERENCES contacts (contactId)" @@ -221,10 +223,11 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, transport private static final String CREATE_TRANSPORT_VERSIONS = "CREATE TABLE transportVersions" - + " (contactId HASH NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " transportId HASH NOT NULL," - + " localVersion BIGINT NOT NULL," - + " localAcked BIGINT NOT NULL," + + " localVersion BIGINT UNSIGNED NOT NULL," + + " localAcked BIGINT UNSIGNED NOT NULL," + + " expiry BIGINT UNSIGNED NOT NULL," + " PRIMARY KEY (contactId, transportId)," + " FOREIGN KEY (contactId)" + " REFERENCES contacts (contactId)" @@ -236,7 +239,7 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, transport private static final String CREATE_CONTACT_TRANSPORT_PROPS = "CREATE TABLE contactTransportProperties" - + " (contactId INT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " transportId HASH NOT NULL," // Not a foreign key + " key VARCHAR NOT NULL," + " value VARCHAR NOT NULL," @@ -248,9 +251,9 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, transport private static final String CREATE_CONTACT_TRANSPORT_VERSIONS = "CREATE TABLE contactTransportVersions" - + " (contactId HASH NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " transportId HASH NOT NULL," // Not a foreign key - + " remoteVersion BIGINT NOT NULL," + + " remoteVersion BIGINT UNSIGNED NOT NULL," + " remoteAcked BOOLEAN NOT NULL," + " PRIMARY KEY (contactId, transportId)," + " FOREIGN KEY (contactId)" @@ -260,11 +263,11 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, transport read, window private static final String CREATE_ENDPOINTS = "CREATE TABLE endpoints" - + " (contactId INT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " transportId HASH NOT NULL," - + " epoch BIGINT NOT NULL," - + " clockDiff BIGINT NOT NULL," - + " latency BIGINT NOT NULL," + + " epoch BIGINT UNSIGNED NOT NULL," + + " clockDiff BIGINT UNSIGNED NOT NULL," + + " latency BIGINT UNSIGNED NOT NULL," + " alice BOOLEAN NOT NULL," + " PRIMARY KEY (contactId, transportId)," + " FOREIGN KEY (contactId)" @@ -277,12 +280,12 @@ abstract class JdbcDatabase implements Database<Connection> { // Locking: contact read, transport read, window private static final String CREATE_SECRETS = "CREATE TABLE secrets" - + " (contactId INT NOT NULL," + + " (contactId INT UNSIGNED NOT NULL," + " transportId HASH NOT NULL," - + " period BIGINT NOT NULL," + + " period BIGINT UNSIGNED NOT NULL," + " secret SECRET NOT NULL," - + " outgoing BIGINT NOT NULL," - + " centre BIGINT NOT NULL," + + " outgoing BIGINT UNSIGNED NOT NULL," + + " centre BIGINT UNSIGNED NOT NULL," + " bitmap BINARY NOT NULL," + " PRIMARY KEY (contactId, transportId, period)," + " FOREIGN KEY (contactId)" @@ -505,9 +508,10 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); // Create a retention version row - sql = "INSERT INTO retentionVersions (contactId, retention," - + " localVersion, localAcked, remoteVersion, remoteAcked)" - + " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE)"; + sql = "INSERT INTO retentionVersions" + + " (contactId, retention, localVersion, localAcked," + + " remoteVersion, remoteAcked, expiry)" + + " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE, ZERO())"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setInt(2, 1); @@ -516,8 +520,8 @@ abstract class JdbcDatabase implements Database<Connection> { ps.close(); // Create a group version row sql = "INSERT INTO groupVersions (contactId, localVersion," - + " localAcked, remoteVersion, remoteAcked)" - + " VALUES (?, ?, ZERO(), ZERO(), TRUE)"; + + " localAcked, remoteVersion, remoteAcked, expiry)" + + " VALUES (?, ?, ZERO(), ZERO(), TRUE, ZERO())"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setInt(2, 1); @@ -533,9 +537,9 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); if(transports.isEmpty()) return c; - sql = "INSERT INTO transportVersions" - + " (contactId, transportId, localVersion, localAcked)" - + " VALUES (?, ?, ?, ZERO())"; + sql = "INSERT INTO transportVersions (contactId, transportId," + + " localVersion, localAcked, expiry)" + + " VALUES (?, ?, ?, ZERO(), ZERO())"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.setInt(3, 1); @@ -646,33 +650,6 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void addOutstandingMessages(Connection txn, ContactId c, - Collection<MessageId> sent, long expiry) throws DbException { - PreparedStatement ps = null; - try { - // Update the expiry time of each message - String sql = "UPDATE statuses SET expiry = ?" - + " WHERE messageId = ? AND contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setLong(1, expiry); - ps.setInt(3, c.getInt()); - for(MessageId m : sent) { - ps.setBytes(2, m.getBytes()); - ps.addBatch(); - } - int[] batchAffected = ps.executeBatch(); - if(batchAffected.length != sent.size()) - throw new DbStateException(); - for(int i = 0; i < batchAffected.length; i++) { - if(batchAffected[i] > 1) throw new DbStateException(); - } - ps.close(); - } catch(SQLException e) { - tryToClose(ps); - throw new DbException(e); - } - } - public boolean addPrivateMessage(Connection txn, Message m, ContactId c) throws DbException { assert m.getGroup() == null; @@ -812,9 +789,9 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); if(contacts.isEmpty()) return; - sql = "INSERT INTO transportVersions" - + " (contactId, transportId, localVersion, localAcked)" - + " VALUES (?, ?, ?, ZERO())"; + sql = "INSERT INTO transportVersions (contactId, transportId," + + " localVersion, localAcked, expiry)" + + " VALUES (?, ?, ?, ZERO(), ZERO())"; ps = txn.prepareStatement(sql); ps.setBytes(2, t.getBytes()); ps.setInt(3, 1); @@ -848,7 +825,8 @@ abstract class JdbcDatabase implements Database<Connection> { if(affected != 1) throw new DbStateException(); ps.close(); // Bump the subscription version - sql = "UPDATE groupVersions SET localVersion = localVersion + ?" + sql = "UPDATE groupVersions" + + " SET localVersion = localVersion + ?, expiry = ZERO()" + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, 1); @@ -1444,7 +1422,7 @@ abstract class JdbcDatabase implements Database<Connection> { + " WHERE m.messageId = ?" + " AND cg.contactId = ?" + " AND timestamp >= retention" - + " AND seen = FALSE AND expiry < ?" + + " AND seen = FALSE AND s.expiry < ?" + " AND sendability > ZERO()"; ps = txn.prepareStatement(sql); ps.setBytes(1, m.getBytes()); @@ -1557,19 +1535,23 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c) - throws DbException { + public RetentionUpdate getRetentionUpdate(Connection txn, ContactId c, + long maxLatency) throws DbException { + long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT timestamp, localVersion" + " FROM messages AS m" + " JOIN retentionVersions AS rv" - + " WHERE rv.contactId = ? AND localVersion > localAcked" + + " WHERE rv.contactId = ?" + + " AND localVersion > localAcked" + + " AND expiry < ?" + " ORDER BY timestamp LIMIT ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - ps.setInt(2, 1); + ps.setLong(2, now); + ps.setInt(3, 1); rs = ps.executeQuery(); if(!rs.next()) { rs.close(); @@ -1582,6 +1564,13 @@ abstract class JdbcDatabase implements Database<Connection> { if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); + sql = "UPDATE retentionVersions SET expiry = ? WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, calculateExpiry(now, maxLatency)); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected != 1) throw new DbStateException(); + ps.close(); return new RetentionUpdate(retention, version); } catch(SQLException e) { tryToClose(ps); @@ -1692,7 +1681,7 @@ abstract class JdbcDatabase implements Database<Connection> { + " AND cg.contactId = s.contactId" + " WHERE cg.contactId = ?" + " AND timestamp >= retention" - + " AND seen = FALSE AND expiry < ?" + + " AND seen = FALSE AND s.expiry < ?" + " AND sendability > ZERO()" + " ORDER BY timestamp DESC"; ps = txn.prepareStatement(sql); @@ -1822,8 +1811,9 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public SubscriptionUpdate getSubscriptionUpdate(Connection txn, ContactId c) - throws DbException { + public SubscriptionUpdate getSubscriptionUpdate(Connection txn, ContactId c, + long maxLatency) throws DbException { + long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; try { @@ -1834,9 +1824,11 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN groupVersions AS ver" + " ON vis.contactId = ver.contactId" + " WHERE vis.contactId = ?" - + " AND localVersion > localAcked"; + + " AND localVersion > localAcked" + + " AND expiry < ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); + ps.setLong(2, now); rs = ps.executeQuery(); List<Group> subs = new ArrayList<Group>(); long version = 0; @@ -1850,6 +1842,13 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); if(subs.isEmpty()) return null; + sql = "UPDATE groupVersions SET expiry = ? WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, calculateExpiry(now, maxLatency)); + ps.setInt(2, c.getInt()); + int affected = ps.executeUpdate(); + if(affected != 1) throw new DbStateException(); + ps.close(); subs = Collections.unmodifiableList(subs); return new SubscriptionUpdate(subs, version); } catch(SQLException e) { @@ -1902,7 +1901,8 @@ abstract class JdbcDatabase implements Database<Connection> { } public Collection<TransportUpdate> getTransportUpdates(Connection txn, - ContactId c) throws DbException { + ContactId c, long maxLatency) throws DbException { + long now = clock.currentTimeMillis(); PreparedStatement ps = null; ResultSet rs = null; try { @@ -1911,9 +1911,11 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN transportVersions AS tv" + " ON tp.transportId = tv.transportId" + " WHERE tv.contactId = ?" - + " AND localVersion > localAcked"; + + " AND localVersion > localAcked" + + " AND expiry < ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); + ps.setLong(2, now); rs = ps.executeQuery(); List<TransportUpdate> updates = new ArrayList<TransportUpdate>(); TransportId lastId = null; @@ -1931,6 +1933,22 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); if(updates.isEmpty()) return null; + sql = "UPDATE transportVersions SET expiry = ?" + + " WHERE contactId = ? AND transportId = ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, calculateExpiry(now, maxLatency)); + ps.setInt(2, c.getInt()); + for(TransportUpdate u : updates) { + ps.setBytes(3, u.getId().getBytes()); + ps.addBatch(); + } + int [] batchAffected = ps.executeBatch(); + if(batchAffected.length != updates.size()) + throw new DbStateException(); + for(int i = 0; i < batchAffected.length; i++) { + if(batchAffected[i] != 1) throw new DbStateException(); + } + ps.close(); return Collections.unmodifiableList(updates); } catch(SQLException e) { tryToClose(ps); @@ -2045,7 +2063,7 @@ abstract class JdbcDatabase implements Database<Connection> { + " AND cg.contactId = s.contactId" + " WHERE cg.contactId = ?" + " AND timestamp >= retention" - + " AND seen = FALSE AND expiry < ?" + + " AND seen = FALSE AND s.expiry < ?" + " AND sendability > ZERO()" + " LIMIT ?"; ps = txn.prepareStatement(sql); @@ -2110,7 +2128,7 @@ abstract class JdbcDatabase implements Database<Connection> { PreparedStatement ps = null; try { String sql = "UPDATE retentionVersions" - + " SET localVersion = localVersion + ?"; + + " SET localVersion = localVersion + ?, expiry = ZERO()"; ps = txn.prepareStatement(sql); ps.setInt(1, 1); ps.executeUpdate(); @@ -2226,7 +2244,8 @@ abstract class JdbcDatabase implements Database<Connection> { ps.close(); if(visible.isEmpty()) return; // Bump the subscription version for the affected contacts - sql = "UPDATE groupVersions SET localVersion = localVersion + ?" + sql = "UPDATE groupVersions" + + " SET localVersion = localVersion + ?, expiry = ZERO()" + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, 1); @@ -2277,7 +2296,8 @@ abstract class JdbcDatabase implements Database<Connection> { if(affected != 1) throw new DbStateException(); ps.close(); // Bump the subscription version - sql = "UPDATE groupVersions SET localVersion = localVersion + ?" + sql = "UPDATE groupVersions" + + " SET localVersion = localVersion + ?, expiry = ZERO()" + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, 1); @@ -2305,7 +2325,7 @@ abstract class JdbcDatabase implements Database<Connection> { PreparedStatement ps = null; try { String sql = "UPDATE transportVersions" - + " SET localVersion = localVersion + ?" + + " SET localVersion = localVersion + ?, expiry = ZERO()" + " WHERE transportId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, 1); @@ -2385,20 +2405,26 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setRetentionTime(Connection txn, ContactId c, long retention, - long version) throws DbException { + public void setMessageExpiry(Connection txn, ContactId c, + Collection<MessageId> sent, long maxLatency) throws DbException { + long now = clock.currentTimeMillis(); PreparedStatement ps = null; try { - String sql = "UPDATE retentionVersions SET retention = ?," - + " remoteVersion = ?, remoteAcked = FALSE" - + " WHERE contactId = ? AND remoteVersion < ?"; + String sql = "UPDATE statuses SET expiry = ?" + + " WHERE messageId = ? AND contactId = ?"; ps = txn.prepareStatement(sql); - ps.setLong(1, retention); - ps.setLong(2, version); + ps.setLong(1, calculateExpiry(now, maxLatency)); ps.setInt(3, c.getInt()); - ps.setLong(4, version); - int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); + for(MessageId m : sent) { + ps.setBytes(2, m.getBytes()); + ps.addBatch(); + } + int[] batchAffected = ps.executeBatch(); + if(batchAffected.length != sent.size()) + throw new DbStateException(); + for(int i = 0; i < batchAffected.length; i++) { + if(batchAffected[i] > 1) throw new DbStateException(); + } ps.close(); } catch(SQLException e) { tryToClose(ps); @@ -2406,25 +2432,12 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setRetentionUpdateAcked(Connection txn, ContactId c, - long version) throws DbException { - PreparedStatement ps = null; - try { - String sql = "UPDATE retentionVersions SET localAcked = ?" - + " WHERE contactId = ?" - + " AND localAcked < ? AND localVersion >= ?"; - ps = txn.prepareStatement(sql); - ps.setLong(1, version); - ps.setInt(2, c.getInt()); - ps.setLong(3, version); - ps.setLong(4, version); - int affected = ps.executeUpdate(); - if(affected > 1) throw new DbStateException(); - ps.close(); - } catch(SQLException e) { - tryToClose(ps); - throw new DbException(e); - } + private long calculateExpiry(long now, long maxLatency) { + long roundTrip = maxLatency * 2; + if(roundTrip < 0) return Long.MAX_VALUE; // Overflow; + long expiry = now + roundTrip; + if(expiry < 0) return Long.MAX_VALUE; // Overflow + return expiry; } public Rating setRating(Connection txn, AuthorId a, Rating r) @@ -2586,6 +2599,48 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public void setRetentionTime(Connection txn, ContactId c, long retention, + long version) throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE retentionVersions SET retention = ?," + + " remoteVersion = ?, remoteAcked = FALSE" + + " WHERE contactId = ? AND remoteVersion < ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, retention); + ps.setLong(2, version); + ps.setInt(3, c.getInt()); + ps.setLong(4, version); + int affected = ps.executeUpdate(); + if(affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + + public void setRetentionUpdateAcked(Connection txn, ContactId c, + long version) throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE retentionVersions SET localAcked = ?" + + " WHERE contactId = ?" + + " AND localAcked < ? AND localVersion >= ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, version); + ps.setInt(2, c.getInt()); + ps.setLong(3, version); + ps.setLong(4, version); + int affected = ps.executeUpdate(); + if(affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + public void setSendability(Connection txn, MessageId m, int sendability) throws DbException { PreparedStatement ps = null; diff --git a/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java b/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java index 1f56449e743a7e1713d7cdaa40cfda0e1da7e93a..42d0acb290b4152d7fb439ff2f7265c9187239a0 100644 --- a/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java +++ b/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java @@ -3,6 +3,7 @@ package net.sf.briar.messaging.duplex; import static java.util.logging.Level.INFO; import static java.util.logging.Level.WARNING; import static net.sf.briar.api.Rating.GOOD; +import static net.sf.briar.api.messaging.MessagingConstants.MAX_PACKET_LENGTH; import java.io.IOException; import java.io.InputStream; @@ -87,6 +88,7 @@ abstract class DuplexConnection implements DatabaseListener { private final Executor dbExecutor, verificationExecutor; private final MessageVerifier messageVerifier; + private final long maxLatency; private final AtomicBoolean canSendOffer, disposed; private final BlockingQueue<Runnable> writerTasks; @@ -116,6 +118,7 @@ abstract class DuplexConnection implements DatabaseListener { this.transport = transport; contactId = ctx.getContactId(); transportId = ctx.getTransportId(); + maxLatency = transport.getMaxLatency(); canSendOffer = new AtomicBoolean(false); disposed = new AtomicBoolean(false); writerTasks = new LinkedBlockingQueue<Runnable>(); @@ -467,8 +470,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { Collection<byte[]> batch = db.generateBatch(contactId, - Integer.MAX_VALUE, transport.getMaxLatency(), - requested); + MAX_PACKET_LENGTH, maxLatency, requested); if(batch == null) new GenerateOffer().run(); else writerTasks.add(new WriteBatch(batch, requested)); } catch(DbException e) { @@ -583,7 +585,8 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { - RetentionUpdate u = db.generateRetentionUpdate(contactId); + RetentionUpdate u = + db.generateRetentionUpdate(contactId, maxLatency); if(u != null) writerTasks.add(new WriteRetentionUpdate(u)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -649,7 +652,8 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { - SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); + SubscriptionUpdate u = + db.generateSubscriptionUpdate(contactId, maxLatency); if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -717,7 +721,7 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { Collection<TransportUpdate> t = - db.generateTransportUpdates(contactId); + db.generateTransportUpdates(contactId, maxLatency); if(t != null) writerTasks.add(new WriteTransportUpdates(t)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); diff --git a/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java b/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java index e2990f547a2c46f1b49642a08fd12f8651a19f8b..920a7724427cae909edafb250cad20db7361e710 100644 --- a/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java @@ -42,6 +42,7 @@ class OutgoingSimplexConnection { private final SimplexTransportWriter transport; private final ContactId contactId; private final TransportId transportId; + private final long maxLatency; OutgoingSimplexConnection(DatabaseComponent db, ConnectionRegistry connRegistry, @@ -56,6 +57,7 @@ class OutgoingSimplexConnection { this.transport = transport; contactId = ctx.getContactId(); transportId = ctx.getTransportId(); + maxLatency = transport.getMaxLatency(); } void write() { @@ -69,7 +71,6 @@ class OutgoingSimplexConnection { throw new EOFException(); PacketWriter writer = packetWriterFactory.createPacketWriter(out, transport.shouldFlush()); - long maxLatency = transport.getMaxLatency(); // Send the initial packets: updates and acks boolean hasSpace = writeTransportAcks(conn, writer); if(hasSpace) hasSpace = writeTransportUpdates(conn, writer); @@ -128,7 +129,7 @@ class OutgoingSimplexConnection { PacketWriter writer) throws DbException, IOException { assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH; Collection<TransportUpdate> updates = - db.generateTransportUpdates(contactId); + db.generateTransportUpdates(contactId, maxLatency); if(updates == null) return true; for(TransportUpdate u : updates) { writer.writeTransportUpdate(u); @@ -149,7 +150,8 @@ class OutgoingSimplexConnection { private boolean writeSubscriptionUpdate(ConnectionWriter conn, PacketWriter writer) throws DbException, IOException { assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH; - SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); + SubscriptionUpdate u = + db.generateSubscriptionUpdate(contactId, maxLatency); if(u == null) return true; writer.writeSubscriptionUpdate(u); return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH; @@ -167,7 +169,7 @@ class OutgoingSimplexConnection { private boolean writeRetentionUpdate(ConnectionWriter conn, PacketWriter writer) throws DbException, IOException { assert conn.getRemainingCapacity() >= MAX_PACKET_LENGTH; - RetentionUpdate u = db.generateRetentionUpdate(contactId); + RetentionUpdate u = db.generateRetentionUpdate(contactId, maxLatency); if(u == null) return true; writer.writeRetentionUpdate(u); return conn.getRemainingCapacity() >= MAX_PACKET_LENGTH; diff --git a/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java b/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java index 09c76be1adb97624d02af8712cceb2006a9dbe90..83b0cbf621a54af0abc5af6503b96f9c0bcc95f9 100644 --- a/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java +++ b/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java @@ -523,7 +523,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase { } catch(NoSuchContactException expected) {} try { - db.generateRetentionUpdate(contactId); + db.generateRetentionUpdate(contactId, 123); fail(); } catch(NoSuchContactException expected) {} @@ -533,7 +533,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase { } catch(NoSuchContactException expected) {} try { - db.generateSubscriptionUpdate(contactId); + db.generateSubscriptionUpdate(contactId, 123); fail(); } catch(NoSuchContactException expected) {} @@ -543,7 +543,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase { } catch(NoSuchContactException expected) {} try { - db.generateTransportUpdates(contactId); + db.generateTransportUpdates(contactId, 123); fail(); } catch(NoSuchContactException expected) {} @@ -696,7 +696,7 @@ public abstract class DatabaseComponentTest extends BriarTestCase { oneOf(database).getRawMessage(txn, messageId1); will(returnValue(raw1)); // Record the outstanding messages - oneOf(database).addOutstandingMessages(txn, contactId, sendable, + oneOf(database).setMessageExpiry(txn, contactId, sendable, Long.MAX_VALUE); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, @@ -733,8 +733,8 @@ public abstract class DatabaseComponentTest extends BriarTestCase { will(returnValue(raw1)); // Message is sendable oneOf(database).getRawMessageIfSendable(txn, contactId, messageId2); will(returnValue(null)); // Message is not sendable - // Record the outstanding message - oneOf(database).addOutstandingMessages(txn, contactId, + // Mark the message as sent + oneOf(database).setMessageExpiry(txn, contactId, Arrays.asList(messageId1), Long.MAX_VALUE); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, @@ -788,13 +788,14 @@ public abstract class DatabaseComponentTest extends BriarTestCase { allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getSubscriptionUpdate(txn, contactId); + oneOf(database).getSubscriptionUpdate(txn, contactId, + Long.MAX_VALUE); will(returnValue(null)); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); - assertNull(db.generateSubscriptionUpdate(contactId)); + assertNull(db.generateSubscriptionUpdate(contactId, Long.MAX_VALUE)); context.assertIsSatisfied(); } @@ -812,13 +813,15 @@ public abstract class DatabaseComponentTest extends BriarTestCase { allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getSubscriptionUpdate(txn, contactId); + oneOf(database).getSubscriptionUpdate(txn, contactId, + Long.MAX_VALUE); will(returnValue(new SubscriptionUpdate(Arrays.asList(group), 1))); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); - SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); + SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId, + Long.MAX_VALUE); assertEquals(Arrays.asList(group), u.getGroups()); assertEquals(1, u.getVersion()); @@ -838,13 +841,13 @@ public abstract class DatabaseComponentTest extends BriarTestCase { allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getTransportUpdates(txn, contactId); + oneOf(database).getTransportUpdates(txn, contactId, Long.MAX_VALUE); will(returnValue(null)); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); - assertNull(db.generateTransportUpdates(contactId)); + assertNull(db.generateTransportUpdates(contactId, Long.MAX_VALUE)); context.assertIsSatisfied(); } @@ -862,15 +865,15 @@ public abstract class DatabaseComponentTest extends BriarTestCase { allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - oneOf(database).getTransportUpdates(txn, contactId); + oneOf(database).getTransportUpdates(txn, contactId, Long.MAX_VALUE); will(returnValue(Arrays.asList(new TransportUpdate(transportId, transportProperties, 1)))); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner, shutdown); - Collection<TransportUpdate> updates = db.generateTransportUpdates( - contactId); + Collection<TransportUpdate> updates = + db.generateTransportUpdates(contactId, Long.MAX_VALUE); assertNotNull(updates); assertEquals(1, updates.size()); TransportUpdate u = updates.iterator().next(); diff --git a/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java b/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java index a4ebd5158f2d01a9c7f1be1c60c451a0f7e63d4a..74bc6b4a9b6ea1a33ef9d37a916aa85b4895cac1 100644 --- a/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java +++ b/briar-tests/src/net/sf/briar/db/H2DatabaseTest.java @@ -523,7 +523,7 @@ public class H2DatabaseTest extends BriarTestCase { assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); - db.addOutstandingMessages(txn, contactId, Arrays.asList(messageId), + db.setMessageExpiry(txn, contactId, Arrays.asList(messageId), Long.MAX_VALUE); // The message should no longer be sendable diff --git a/briar-tests/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnectionTest.java b/briar-tests/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnectionTest.java index 31e5270ace230f265c4a30dd7f4530ae53ad1091..b13cae6016957c77e90008fbeb5fbddb41ffaf42 100644 --- a/briar-tests/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnectionTest.java +++ b/briar-tests/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnectionTest.java @@ -113,19 +113,22 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase { oneOf(db).generateTransportAcks(contactId); will(returnValue(null)); // No transport updates to send - oneOf(db).generateTransportUpdates(contactId); + oneOf(db).generateTransportUpdates(with(contactId), + with(any(long.class))); will(returnValue(null)); // No subscription ack to send oneOf(db).generateSubscriptionAck(contactId); will(returnValue(null)); // No subscription update to send - oneOf(db).generateSubscriptionUpdate(contactId); + oneOf(db).generateSubscriptionUpdate(with(contactId), + with(any(long.class))); will(returnValue(null)); // No retention ack to send oneOf(db).generateRetentionAck(contactId); will(returnValue(null)); // No retention update to send - oneOf(db).generateRetentionUpdate(contactId); + oneOf(db).generateRetentionUpdate(with(contactId), + with(any(long.class))); will(returnValue(null)); // No acks to send oneOf(db).generateAck(with(contactId), with(any(int.class))); @@ -160,19 +163,22 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase { oneOf(db).generateTransportAcks(contactId); will(returnValue(null)); // No transport updates to send - oneOf(db).generateTransportUpdates(contactId); + oneOf(db).generateTransportUpdates(with(contactId), + with(any(long.class))); will(returnValue(null)); // No subscription ack to send oneOf(db).generateSubscriptionAck(contactId); will(returnValue(null)); // No subscription update to send - oneOf(db).generateSubscriptionUpdate(contactId); + oneOf(db).generateSubscriptionUpdate(with(contactId), + with(any(long.class))); will(returnValue(null)); // No retention ack to send oneOf(db).generateRetentionAck(contactId); will(returnValue(null)); // No retention update to send - oneOf(db).generateRetentionUpdate(contactId); + oneOf(db).generateRetentionUpdate(with(contactId), + with(any(long.class))); will(returnValue(null)); // One ack to send oneOf(db).generateAck(with(contactId), with(any(int.class)));