diff --git a/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java b/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java index c908d3e4710ea8e76f2d664d1cb0d8252107ca6c..6399ad328a2b2ae755e22f09be95c781c07bde79 100644 --- a/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java +++ b/briar-api/src/net/sf/briar/api/db/DatabaseComponent.java @@ -11,6 +11,8 @@ import net.sf.briar.api.TransportProperties; import net.sf.briar.api.db.event.DatabaseListener; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.AuthorId; +import net.sf.briar.api.protocol.ExpiryAck; +import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; @@ -96,6 +98,18 @@ public interface DatabaseComponent { Collection<byte[]> generateBatch(ContactId c, int maxLength, Collection<MessageId> requested) throws DbException; + /** + * Generates an expiry ack for the given contact. Returns null if no ack + * is due. + */ + ExpiryAck generateExpiryAck(ContactId c) throws DbException; + + /** + * Generates an expiry update for the given contact. Returns null if no + * update is due. + */ + ExpiryUpdate generateExpiryUpdate(ContactId c) throws DbException; + /** * Generates an offer for the given contact. Returns null if there are no * messages to offer. @@ -186,6 +200,12 @@ public interface DatabaseComponent { /** Processes an ack from the given contact. */ void receiveAck(ContactId c, Ack a) throws DbException; + /** Processes an expiry ack from the given contact. */ + void receiveExpiryAck(ContactId c, ExpiryAck a) throws DbException; + + /** Processes an expiry update from the given contact. */ + void receiveExpiryUpdate(ContactId c, ExpiryUpdate u) throws DbException; + /** Processes a message from the given contact. */ void receiveMessage(ContactId c, Message m) throws DbException; @@ -204,14 +224,14 @@ public interface DatabaseComponent { throws DbException; /** Processes a subscription update from the given contact. */ - void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s) + void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate u) throws DbException; /** Processes a transport ack from the given contact. */ void receiveTransportAck(ContactId c, TransportAck a) throws DbException; /** Processes a transport update from the given contact. */ - void receiveTransportUpdate(ContactId c, TransportUpdate t) + void receiveTransportUpdate(ContactId c, TransportUpdate u) throws DbException; /** Removes a contact (and all associated state) from the database. */ diff --git a/briar-api/src/net/sf/briar/api/protocol/ProtocolWriter.java b/briar-api/src/net/sf/briar/api/protocol/ProtocolWriter.java index 58785a290c944bb7c8a198b47bd89b2245dd1a3d..718e8be37cb94748b4d5807dd677fda7c7ce12d9 100644 --- a/briar-api/src/net/sf/briar/api/protocol/ProtocolWriter.java +++ b/briar-api/src/net/sf/briar/api/protocol/ProtocolWriter.java @@ -22,11 +22,11 @@ public interface ProtocolWriter { void writeSubscriptionAck(SubscriptionAck a) throws IOException; - void writeSubscriptionUpdate(SubscriptionUpdate s) throws IOException; + void writeSubscriptionUpdate(SubscriptionUpdate u) throws IOException; void writeTransportAck(TransportAck a) throws IOException; - void writeTransportUpdate(TransportUpdate t) throws IOException; + void writeTransportUpdate(TransportUpdate u) throws IOException; void flush() throws IOException; diff --git a/briar-core/src/net/sf/briar/db/Database.java b/briar-core/src/net/sf/briar/db/Database.java index 98f4b0adb45d575c7514f1511021072e2c103867..3e1e06ec2276c0a31e67f488cb26fc25dace6713 100644 --- a/briar-core/src/net/sf/briar/db/Database.java +++ b/briar-core/src/net/sf/briar/db/Database.java @@ -11,6 +11,8 @@ import net.sf.briar.api.TransportProperties; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.MessageHeader; import net.sf.briar.api.protocol.AuthorId; +import net.sf.briar.api.protocol.ExpiryAck; +import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; @@ -34,6 +36,7 @@ import net.sf.briar.api.transport.TemporarySecret; * deadlock, locks must be acquired in the following (alphabetical) order: * <ul> * <li> contact + * <li> expiry * <li> message * <li> rating * <li> subscription @@ -206,11 +209,19 @@ interface Database<T> { Collection<ContactTransport> getContactTransports(T txn) throws DbException; /** - * Returns the approximate expiry time of the database. + * Returns an expiry ack for the given contact, or null if no ack is due. * <p> - * Locking: message read. + * Locking: contact read, expiry write. + */ + ExpiryAck getExpiryAck(T txn, ContactId c) throws DbException; + + /** + * Returns an expiry update for the given contact, or null if no update is + * due. + * <p> + * Locking: contact read, expiry write. */ - long getExpiryTime(T txn) throws DbException; + ExpiryUpdate getExpiryUpdate(T txn, ContactId c) throws DbException; /** * Returns the amount of free storage space available to the database, in @@ -447,6 +458,14 @@ interface Database<T> { long incrementConnectionCounter(T txn, ContactId c, TransportId t, long period) throws DbException; + /** + * Increments the expiry versions for all contacts to indicate that the + * database's expiry time has changed and expiry updates should be sent. + * <p> + * Locking: contact read, expiry write. + */ + void incrementExpiryVersions(T txn) throws DbException; + /** * Merges the given configuration with the existing configuration for the * given transport. @@ -531,11 +550,14 @@ interface Database<T> { long centre, byte[] bitmap) throws DbException; /** - * Sets the given contact's database expiry time. + * Sets the expiry time of the given contact's database, unless an update + * with an equal or higher version number has already been received from + * the contact. * <p> - * Locking: contact write. + * Locking: contact read, expiry write. */ - void setExpiryTime(T txn, ContactId c, long expiry) throws DbException; + void setExpiryTime(T txn, ContactId c, long expiry, long version) + throws DbException; /** * Sets the user's rating for the given author. @@ -560,7 +582,7 @@ interface Database<T> { * <p> * Locking: contact read, transport write. */ - void setRemoteProperties(T txn, ContactId c, TransportUpdate t) + void setRemoteProperties(T txn, ContactId c, TransportUpdate u) throws DbException; /** @@ -604,7 +626,16 @@ interface Database<T> { * <p> * Locking: contact read, subscription write. */ - void setSubscriptions(T txn, ContactId c, SubscriptionUpdate s) + void setSubscriptions(T txn, ContactId c, SubscriptionUpdate u) + throws DbException; + + /** + * Records an expiry ack from the given contact for the given version + * unless the contact has already acked an equal or higher version. + * <p> + * Locking: contact read, expiry write. + */ + void setExpiryUpdateAcked(T txn, ContactId c, long version) throws DbException; /** diff --git a/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java b/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java index 0ab4bfc60cddb3a94ce1dbabb0343b325d8a80aa..6517e181746c43e7f0a1669c43b1768b37be5509 100644 --- a/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java +++ b/briar-core/src/net/sf/briar/db/DatabaseComponentImpl.java @@ -46,6 +46,8 @@ import net.sf.briar.api.db.event.TransportRemovedEvent; import net.sf.briar.api.lifecycle.ShutdownManager; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.AuthorId; +import net.sf.briar.api.protocol.ExpiryAck; +import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; @@ -81,6 +83,8 @@ DatabaseCleaner.Callback { private final ReentrantReadWriteLock contactLock = new ReentrantReadWriteLock(true); + private final ReentrantReadWriteLock expiryLock = + new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock messageLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock ratingLock = @@ -590,6 +594,54 @@ DatabaseCleaner.Callback { return Collections.unmodifiableList(messages); } + public ExpiryAck generateExpiryAck(ContactId c) throws DbException { + contactLock.readLock().lock(); + try { + expiryLock.writeLock().lock(); + try { + T txn = db.startTransaction(); + try { + if(!db.containsContact(txn, c)) + throw new NoSuchContactException(); + ExpiryAck a = db.getExpiryAck(txn, c); + db.commitTransaction(txn); + return a; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + expiryLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + + public ExpiryUpdate generateExpiryUpdate(ContactId c) throws DbException { + contactLock.readLock().lock(); + try { + expiryLock.writeLock().lock(); + try { + T txn = db.startTransaction(); + try { + if(!db.containsContact(txn, c)) + throw new NoSuchContactException(); + ExpiryUpdate e = db.getExpiryUpdate(txn, c); + db.commitTransaction(txn); + return e; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + expiryLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + public Offer generateOffer(ContactId c, int maxMessages) throws DbException { Collection<MessageId> offered; @@ -651,9 +703,9 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - SubscriptionUpdate s = db.getSubscriptionUpdate(txn, c); + SubscriptionUpdate u = db.getSubscriptionUpdate(txn, c); db.commitTransaction(txn); - return s; + return u; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -1038,6 +1090,54 @@ DatabaseCleaner.Callback { } } + public void receiveExpiryAck(ContactId c, ExpiryAck a) throws DbException { + contactLock.readLock().lock(); + try { + expiryLock.writeLock().lock(); + try { + T txn = db.startTransaction(); + try { + if(!db.containsContact(txn, c)) + throw new NoSuchContactException(); + db.setExpiryUpdateAcked(txn, c, a.getVersionNumber()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + expiryLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + + public void receiveExpiryUpdate(ContactId c, ExpiryUpdate u) + throws DbException { + contactLock.readLock().lock(); + try { + expiryLock.writeLock().lock(); + try { + T txn = db.startTransaction(); + try { + if(!db.containsContact(txn, c)) + throw new NoSuchContactException(); + db.setExpiryTime(txn, c, u.getExpiryTime(), + u.getVersionNumber()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + expiryLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + public void receiveMessage(ContactId c, Message m) throws DbException { boolean added = false; contactLock.readLock().lock(); @@ -1149,7 +1249,7 @@ DatabaseCleaner.Callback { } } - public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate s) + public void receiveSubscriptionUpdate(ContactId c, SubscriptionUpdate u) throws DbException { contactLock.readLock().lock(); try { @@ -1159,7 +1259,7 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - db.setSubscriptions(txn, c, s); + db.setSubscriptions(txn, c, u); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -1200,7 +1300,7 @@ DatabaseCleaner.Callback { } } - public void receiveTransportUpdate(ContactId c, TransportUpdate t) + public void receiveTransportUpdate(ContactId c, TransportUpdate u) throws DbException { contactLock.readLock().lock(); try { @@ -1210,7 +1310,7 @@ DatabaseCleaner.Callback { try { if(!db.containsContact(txn, c)) throw new NoSuchContactException(); - db.setRemoteProperties(txn, c, t); + db.setRemoteProperties(txn, c, u); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -1223,7 +1323,7 @@ DatabaseCleaner.Callback { contactLock.readLock().unlock(); } // Call the listeners outside the lock - callListeners(new RemoteTransportsUpdatedEvent(c, t.getId())); + callListeners(new RemoteTransportsUpdatedEvent(c, u.getId())); } public void removeContact(ContactId c) throws DbException { @@ -1502,27 +1602,37 @@ DatabaseCleaner.Callback { * removed. */ private boolean expireMessages(int size) throws DbException { - Collection<MessageId> old; + boolean removed = false; contactLock.readLock().lock(); try { - messageLock.writeLock().lock(); + expiryLock.writeLock().lock(); try { - T txn = db.startTransaction(); + messageLock.writeLock().lock(); try { - old = db.getOldMessages(txn, size); - for(MessageId m : old) removeMessage(txn, m); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + T txn = db.startTransaction(); + try { + Collection<MessageId> old = + db.getOldMessages(txn, size); + if(!old.isEmpty()) { + for(MessageId m : old) removeMessage(txn, m); + db.incrementExpiryVersions(txn); + removed = true; + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + expiryLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } - return old.isEmpty(); + return removed; } /** diff --git a/briar-core/src/net/sf/briar/db/JdbcDatabase.java b/briar-core/src/net/sf/briar/db/JdbcDatabase.java index a4263a62a012b1a6bd084912b5e015ae5c2f3ce4..96db7bd8243bed7f7a2e76e9870c3526dd37d64e 100644 --- a/briar-core/src/net/sf/briar/db/JdbcDatabase.java +++ b/briar-core/src/net/sf/briar/db/JdbcDatabase.java @@ -31,6 +31,8 @@ import net.sf.briar.api.db.DbClosedException; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.MessageHeader; import net.sf.briar.api.protocol.AuthorId; +import net.sf.briar.api.protocol.ExpiryAck; +import net.sf.briar.api.protocol.ExpiryUpdate; import net.sf.briar.api.protocol.Group; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; @@ -54,9 +56,22 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_CONTACTS = "CREATE TABLE contacts" + " (contactId COUNTER," - + " expiry BIGINT NOT NULL DEFAULT 0," // FIXME: Move this + " PRIMARY KEY (contactId))"; + // Locking: expiry + private static final String CREATE_EXPIRY_VERSIONS = + "CREATE TABLE expiryVersions" + + " (contactId INT NOT NULL," + + " expiry BIGINT NOT NULL," + + " localVersion BIGINT NOT NULL," + + " localAcked BIGINT NOT NULL," + + " remoteVersion BIGINT NOT NULL," + + " remoteAcked BOOLEAN NOT NULL," + + " PRIMARY KEY (contactId)," + + " FOREIGN KEY (contactId)" + + " REFERENCES contacts (contactId)" + + " ON DELETE CASCADE)"; + // Locking: message private static final String CREATE_MESSAGES = "CREATE TABLE messages" @@ -340,6 +355,7 @@ abstract class JdbcDatabase implements Database<Connection> { try { s = txn.createStatement(); s.executeUpdate(insertTypeNames(CREATE_CONTACTS)); + s.executeUpdate(insertTypeNames(CREATE_EXPIRY_VERSIONS)); s.executeUpdate(insertTypeNames(CREATE_MESSAGES)); s.executeUpdate(INDEX_MESSAGES_BY_PARENT); s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR); @@ -497,6 +513,16 @@ abstract class JdbcDatabase implements Database<Connection> { if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); + // Create an expiry version row + sql = "INSERT INTO expiryVersions (contactId, expiry," + + " localVersion, localAcked, remoteVersion, remoteAcked)" + + " VALUES (?, ZERO(), ?, ZERO(), ZERO(), TRUE)"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, 1); + affected = ps.executeUpdate(); + if(affected != 1) throw new DbStateException(); + ps.close(); // Create a group version row sql = "INSERT INTO groupVersions (contactId, localVersion," + " localAcked, remoteVersion, remoteAcked)" @@ -1016,27 +1042,68 @@ abstract class JdbcDatabase implements Database<Connection> { } else return f.length(); } - public long getExpiryTime(Connection txn) throws DbException { + public ExpiryAck getExpiryAck(Connection txn, ContactId c) + throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { - long timestamp = 0L; - String sql = "SELECT timestamp FROM messages" - + " ORDER BY timestamp LIMIT ?"; + String sql = "SELECT remoteVersion FROM expiryVersions" + + " WHERE contactId = ? AND remoteAcked = FALSE"; ps = txn.prepareStatement(sql); - ps.setInt(1, 1); + ps.setInt(1, c.getInt()); rs = ps.executeQuery(); - if(rs.next()) { - timestamp = rs.getLong(1); - timestamp -= timestamp % EXPIRY_MODULUS; + if(!rs.next()) { + rs.close(); + ps.close(); + return null; } + long version = rs.getLong(1); if(rs.next()) throw new DbStateException(); rs.close(); ps.close(); - return timestamp; + sql = "UPDATE expiryVersions SET remoteAcked = TRUE" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + int affected = ps.executeUpdate(); + if(affected != 1) throw new DbStateException(); + ps.close(); + return new ExpiryAck(version); } catch(SQLException e) { + tryToClose(ps); tryToClose(rs); + throw new DbException(e); + } + } + + public ExpiryUpdate getExpiryUpdate(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT timestamp, localVersion" + + " FROM messages JOIN expiryVersions" + + " WHERE contactId = ? AND localVersion > localAcked" + + " ORDER BY timestamp LIMIT ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, 1); + rs = ps.executeQuery(); + if(!rs.next()) { + rs.close(); + ps.close(); + return null; + } + long expiry = rs.getLong(1); + expiry -= expiry % EXPIRY_MODULUS; + long version = rs.getLong(2); + if(rs.next()) throw new DbStateException(); + rs.close(); + ps.close(); + return new ExpiryUpdate(expiry, version); + } catch(SQLException e) { tryToClose(ps); + tryToClose(rs); throw new DbException(e); } } @@ -1210,8 +1277,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN groupVisibilities AS gv" + " ON m.groupId = gv.groupId" + " AND cg.contactId = gv.contactId" - + " JOIN contacts AS c" - + " ON cg.contactId = c.contactId" + + " JOIN expiryVersions AS ev" + + " ON cg.contactId = ev.contactId" + " JOIN statuses AS s" + " ON m.messageId = s.messageId" + " AND cg.contactId = s.contactId" @@ -1316,8 +1383,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN groupVisibilities AS gv" + " ON m.groupId = gv.groupId" + " AND cg.contactId = gv.contactId" - + " JOIN contacts AS c" - + " ON cg.contactId = c.contactId" + + " JOIN expiryVersions AS ev" + + " ON cg.contactId = ev.contactId" + " JOIN statuses AS s" + " ON m.messageId = s.messageId" + " AND cg.contactId = s.contactId" @@ -1577,8 +1644,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN groupVisibilities AS gv" + " ON m.groupId = gv.groupId" + " AND cg.contactId = gv.contactId" - + " JOIN contacts AS c" - + " ON cg.contactId = c.contactId" + + " JOIN expiryVersions AS ev" + + " ON cg.contactId = ev.contactId" + " JOIN statuses AS s" + " ON m.messageId = s.messageId" + " AND cg.contactId = s.contactId" @@ -1721,7 +1788,7 @@ abstract class JdbcDatabase implements Database<Connection> { try { String sql = "SELECT g.groupId, name, key, localVersion" + " FROM groups AS g" - + " JOIN groupVisibilities as gv" + + " JOIN groupVisibilities AS gv" + " ON g.groupId = gv.groupId" + " JOIN groupVersions AS v" + " ON gv.contactId = v.contactId" @@ -1800,7 +1867,7 @@ abstract class JdbcDatabase implements Database<Connection> { try { String sql = "SELECT transportId, key, value, localVersion" + " FROM transportProperties AS tp" - + " JOIN transportVersions as tv" + + " JOIN transportVersions AS tv" + " ON tp.transportId = tv.transportId" + " WHERE tv.contactId = ?" + " AND localVersion > localAcked"; @@ -1909,8 +1976,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN groupVisibilities AS gv" + " ON m.groupId = gv.groupId" + " AND cg.contactId = gv.contactId" - + " JOIN contacts AS c" - + " ON cg.contactId = c.contactId" + + " JOIN expiryVersions AS ev" + + " ON cg.contactId = ev.contactId" + " JOIN statuses AS s" + " ON m.messageId = s.messageId" + " AND cg.contactId = s.contactId" @@ -1977,6 +2044,20 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public void incrementExpiryVersions(Connection txn) throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE expiryVersions" + + " SET localVersion = localVersion + ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, 1); + ps.executeUpdate(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + public void removeOutstandingMessages(Connection txn, ContactId c, Collection<MessageId> acked) throws DbException { PreparedStatement ps = null; @@ -2244,15 +2325,39 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setExpiryTime(Connection txn, ContactId c, long expiry) - throws DbException { + public void setExpiryTime(Connection txn, ContactId c, long expiry, + long version) throws DbException { PreparedStatement ps = null; try { - String sql = "UPDATE contacts SET expiry = ?" - + " WHERE contactId = ?"; + String sql = "UPDATE expiryVersions" + + " SET expiry = ?, remoteVersion = ?, remoteAcked = FALSE" + + " WHERE contactId = ? AND remoteVersion < ?"; ps = txn.prepareStatement(sql); ps.setLong(1, expiry); + ps.setLong(2, version); + ps.setInt(3, c.getInt()); + ps.setLong(4, version); + int affected = ps.executeUpdate(); + if(affected > 1) throw new DbStateException(); + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + throw new DbException(e); + } + } + + public void setExpiryUpdateAcked(Connection txn, ContactId c, long version) + throws DbException { + PreparedStatement ps = null; + try { + String sql = "UPDATE expiryVersions SET localAcked = ?" + + " WHERE contactId = ?" + + " AND localAcked < ? AND localVersion >= ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, version); ps.setInt(2, c.getInt()); + ps.setLong(3, version); + ps.setLong(4, version); int affected = ps.executeUpdate(); if(affected > 1) throw new DbStateException(); ps.close(); @@ -2362,16 +2467,17 @@ abstract class JdbcDatabase implements Database<Connection> { } public void setRemoteProperties(Connection txn, ContactId c, - TransportUpdate t) throws DbException { + TransportUpdate u) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { + TransportId t = u.getId(); // Find the existing version, if any String sql = "SELECT remoteVersion FROM contactTransportVersions" + " WHERE contactId = ? AND transportId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - ps.setBytes(2, t.getId().getBytes()); + ps.setBytes(2, t.getBytes()); rs = ps.executeQuery(); long version = rs.next() ? rs.getLong(1) : -1L; if(rs.next()) throw new DbStateException(); @@ -2385,8 +2491,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " VALUES (?, ?, ?, FALSE)"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - ps.setBytes(2, t.getId().getBytes()); - ps.setLong(3, t.getVersionNumber()); + ps.setBytes(2, t.getBytes()); + ps.setLong(3, u.getVersionNumber()); int affected = ps.executeUpdate(); if(affected != 1) throw new DbStateException(); ps.close(); @@ -2396,32 +2502,32 @@ abstract class JdbcDatabase implements Database<Connection> { + " SET remoteVersion = ?, remoteAcked = FALSE" + " WHERE contactId = ? AND transportId = ?"; ps = txn.prepareStatement(sql); - ps.setLong(1, Math.max(version, t.getVersionNumber())); + ps.setLong(1, Math.max(version, u.getVersionNumber())); ps.setInt(1, c.getInt()); - ps.setBytes(2, t.getId().getBytes()); + ps.setBytes(2, t.getBytes()); int affected = ps.executeUpdate(); if(affected > 1) throw new DbStateException(); ps.close(); // Return if the update is obsolete - if(t.getVersionNumber() <= version) return; + if(u.getVersionNumber() <= version) return; } // Delete the existing properties, if any sql = "DELETE FROM contactTransportProperties" + " WHERE contactId = ? AND transportId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - ps.setBytes(2, t.getId().getBytes()); + ps.setBytes(2, t.getBytes()); ps.executeUpdate(); ps.close(); // Store the new properties, if any - TransportProperties p = t.getProperties(); + TransportProperties p = u.getProperties(); if(p.isEmpty()) return; sql = "INSERT INTO contactTransportProperties" + " (contactId, transportId, key, value)" + " VALUES (?, ?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - ps.setBytes(2, t.getId().getBytes()); + ps.setBytes(2, t.getBytes()); for(Entry<String, String> e : p.entrySet()) { ps.setString(1, e.getKey()); ps.setString(2, e.getValue()); @@ -2568,8 +2674,8 @@ abstract class JdbcDatabase implements Database<Connection> { + " JOIN groupVisibilities AS gv" + " ON m.groupId = gv.groupId" + " AND cg.contactId = gv.contactId" - + " JOIN contacts AS c" - + " ON cg.contactId = c.contactId" + + " JOIN expiryVersions AS ev" + + " ON cg.contactId = ev.contactId" + " WHERE messageId = ?" + " AND cg.contactId = ?" + " AND timestamp >= expiry"; @@ -2600,7 +2706,7 @@ abstract class JdbcDatabase implements Database<Connection> { } public void setSubscriptions(Connection txn, ContactId c, - SubscriptionUpdate s) throws DbException { + SubscriptionUpdate u) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { @@ -2620,7 +2726,7 @@ abstract class JdbcDatabase implements Database<Connection> { + " SET remoteVersion = ?, remoteAcked = FALSE" + " WHERE contactId = ?"; ps = txn.prepareStatement(sql); - ps.setLong(1, Math.max(version, s.getVersionNumber())); + ps.setLong(1, Math.max(version, u.getVersionNumber())); ps.setInt(2, c.getInt()); int affected = ps.executeUpdate(); if(affected > 1) throw new DbStateException(); @@ -2631,7 +2737,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(1, c.getInt()); ps.executeUpdate(); // Store the new subscriptions, if any - Collection<Group> subs = s.getGroups(); + Collection<Group> subs = u.getGroups(); if(subs.isEmpty()) return; sql = "INSERT INTO contactGroups (contactId, groupId, name, key)" + " VALUES (?, ?, ?, ?)"; @@ -2664,11 +2770,13 @@ abstract class JdbcDatabase implements Database<Connection> { PreparedStatement ps = null; try { String sql = "UPDATE groupVersions SET localAcked = ?" - + " WHERE contactId = ? AND localAcked < ?"; + + " WHERE contactId = ?" + + " AND localAcked < ? AND localVersion >= ?"; ps = txn.prepareStatement(sql); ps.setLong(1, version); ps.setInt(2, c.getInt()); ps.setLong(3, version); + ps.setLong(4, version); int affected = ps.executeUpdate(); if(affected > 1) throw new DbStateException(); ps.close(); @@ -2684,12 +2792,13 @@ abstract class JdbcDatabase implements Database<Connection> { try { String sql = "UPDATE transportVersions SET localAcked = ?" + " WHERE contactId = ? AND transportId = ?" - + " AND localAcked < ?"; + + " AND localAcked < ? AND localVersion >= ?"; ps = txn.prepareStatement(sql); ps.setLong(1, version); ps.setInt(2, c.getInt()); ps.setBytes(3, t.getBytes()); ps.setLong(4, version); + ps.setLong(5, version); int affected = ps.executeUpdate(); if(affected > 1) throw new DbStateException(); ps.close(); diff --git a/briar-core/src/net/sf/briar/protocol/ProtocolWriterImpl.java b/briar-core/src/net/sf/briar/protocol/ProtocolWriterImpl.java index d5ed0e1f381aba9383182e1ba9f911b8bff13cab..e345aa478f2a057ad4dc487fc823bcd33d330cbf 100644 --- a/briar-core/src/net/sf/briar/protocol/ProtocolWriterImpl.java +++ b/briar-core/src/net/sf/briar/protocol/ProtocolWriterImpl.java @@ -126,11 +126,11 @@ class ProtocolWriterImpl implements ProtocolWriter { if(flush) out.flush(); } - public void writeSubscriptionUpdate(SubscriptionUpdate s) + public void writeSubscriptionUpdate(SubscriptionUpdate u) throws IOException { w.writeStructId(SUBSCRIPTION_UPDATE); w.writeListStart(); - for(Group g : s.getGroups()) { + for(Group g : u.getGroups()) { w.writeStructId(GROUP); w.writeString(g.getName()); byte[] publicKey = g.getPublicKey(); @@ -138,7 +138,7 @@ class ProtocolWriterImpl implements ProtocolWriter { else w.writeBytes(publicKey); } w.writeListEnd(); - w.writeInt64(s.getVersionNumber()); + w.writeInt64(u.getVersionNumber()); if(flush) out.flush(); } @@ -149,11 +149,11 @@ class ProtocolWriterImpl implements ProtocolWriter { if(flush) out.flush(); } - public void writeTransportUpdate(TransportUpdate t) throws IOException { + public void writeTransportUpdate(TransportUpdate u) throws IOException { w.writeStructId(TRANSPORT_UPDATE); - w.writeBytes(t.getId().getBytes()); - w.writeMap(t.getProperties()); - w.writeInt64(t.getVersionNumber()); + w.writeBytes(u.getId().getBytes()); + w.writeMap(u.getProperties()); + w.writeInt64(u.getVersionNumber()); if(flush) out.flush(); } diff --git a/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java b/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java index 9321750e4c23da193d09c513184f22c368dc2241..ab31510cf7037245763eccc3466947a1be913ff0 100644 --- a/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java +++ b/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java @@ -173,11 +173,11 @@ abstract class DuplexConnection implements DatabaseListener { // Start sending the requested messages dbExecutor.execute(new GenerateBatches(requested)); } else if(reader.hasSubscriptionUpdate()) { - SubscriptionUpdate s = reader.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); + SubscriptionUpdate u = reader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); } else if(reader.hasTransportUpdate()) { - TransportUpdate t = reader.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate(t)); + TransportUpdate u = reader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(u)); } else { throw new FormatException(); } @@ -524,8 +524,8 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { - SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) writerTasks.add(new WriteSubscriptionUpdate(s)); + SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); + if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -578,7 +578,7 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { assert writer != null; try { - for(TransportUpdate t : updates) writer.writeTransportUpdate(t); + for(TransportUpdate u : updates) writer.writeTransportUpdate(u); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); diff --git a/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java b/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java index e9abf0c16d718a03138053ca3fa9929c9cd909d4..b876c578c4f0c6cc505032e1c8b424c7a6089694 100644 --- a/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java @@ -83,11 +83,11 @@ class IncomingSimplexConnection { UnverifiedMessage m = reader.readMessage(); verificationExecutor.execute(new VerifyMessage(m)); } else if(reader.hasSubscriptionUpdate()) { - SubscriptionUpdate s = reader.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); + SubscriptionUpdate u = reader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); } else if(reader.hasTransportUpdate()) { - TransportUpdate t = reader.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate(t)); + TransportUpdate u = reader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(u)); } else { throw new FormatException(); } diff --git a/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java b/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java index ef9c7c93246da943cd92301515dc08adfb94153e..cccfa22dd76a9baba5949901797b4b05113c09b6 100644 --- a/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java @@ -71,11 +71,11 @@ class OutgoingSimplexConnection { Collection<TransportUpdate> updates = db.generateTransportUpdates(contactId); if(updates != null) { - for(TransportUpdate t : updates) writer.writeTransportUpdate(t); + for(TransportUpdate u : updates) writer.writeTransportUpdate(u); } // Write a subscription update. FIXME: Check for space - SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) writer.writeSubscriptionUpdate(s); + SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); + if(u != null) writer.writeSubscriptionUpdate(u); // Write acks until you can't write acks no more capacity = conn.getRemainingCapacity(); int maxMessages = writer.getMaxMessagesForAck(capacity);