diff --git a/api/net/sf/briar/api/db/DatabaseComponent.java b/api/net/sf/briar/api/db/DatabaseComponent.java index bd7075d4a3830e46659c1e404cb8b5c75edc80a8..581b4f5d8d926977e4a8216aa01dbc3c90f75a93 100644 --- a/api/net/sf/briar/api/db/DatabaseComponent.java +++ b/api/net/sf/briar/api/db/DatabaseComponent.java @@ -51,7 +51,8 @@ public interface DatabaseComponent { * Generates a bundle of acknowledgements, subscriptions, and batches of * messages for the given contact. */ - void generateBundle(ContactId c, BundleWriter bundleBuilder) throws DbException, IOException, GeneralSecurityException; + void generateBundle(ContactId c, BundleWriter bundleBuilder) + throws DbException, IOException, GeneralSecurityException; /** Returns the IDs of all contacts. */ Set<ContactId> getContacts() throws DbException; @@ -73,7 +74,8 @@ public interface DatabaseComponent { * messages received from the given contact. Some or all of the messages * in the bundle may be stored. */ - void receiveBundle(ContactId c, BundleReader b) throws DbException, IOException, GeneralSecurityException; + void receiveBundle(ContactId c, BundleReader b) throws DbException, + IOException, GeneralSecurityException; /** Removes a contact (and all associated state) from the database. */ void removeContact(ContactId c) throws DbException; @@ -81,12 +83,6 @@ public interface DatabaseComponent { /** Records the user's rating for the given author. */ void setRating(AuthorId a, Rating r) throws DbException; - /** - * Records the transport details for the given contact, replacing any - * existing transport details. - */ - void setTransports(ContactId c, Map<String, String> transports) throws DbException; - /** Subscribes to the given group. */ void subscribe(GroupId g) throws DbException; diff --git a/api/net/sf/briar/api/protocol/MessageParser.java b/api/net/sf/briar/api/protocol/MessageParser.java index 756ed84d3c7cadab27ff3e041c3568e936417b23..cd6ea58a85ebfc51a3d07b43bc47e21bd9c3353a 100644 --- a/api/net/sf/briar/api/protocol/MessageParser.java +++ b/api/net/sf/briar/api/protocol/MessageParser.java @@ -5,5 +5,6 @@ import java.security.GeneralSecurityException; public interface MessageParser { - Message parseMessage(byte[] raw) throws IOException, GeneralSecurityException; + Message parseMessage(byte[] raw) throws IOException, + GeneralSecurityException; } diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index d3bd03cf65240873397cc9c683b32444dffb8786..3357b4b8ac4e00514c26a757ebb254afd2f08aee 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -81,7 +81,8 @@ interface Database<T> { * <p> * Locking: contacts write, transports write. */ - ContactId addContact(T txn, Map<String, String> transports) throws DbException; + ContactId addContact(T txn, Map<String, String> transports) + throws DbException; /** * Returns false if the given message is already in the database. Otherwise @@ -96,7 +97,8 @@ interface Database<T> { * <p> * Locking: contacts read, messages read, messageStatuses write. */ - void addOutstandingBatch(T txn, ContactId c, BatchId b, Set<MessageId> sent) throws DbException; + void addOutstandingBatch(T txn, ContactId c, BatchId b, Set<MessageId> sent) + throws DbException; /** * Subscribes to the given group. @@ -169,7 +171,8 @@ interface Database<T> { * <p> * Locking: messages read. */ - Iterable<MessageId> getMessagesByAuthor(T txn, AuthorId a) throws DbException; + Iterable<MessageId> getMessagesByAuthor(T txn, AuthorId a) + throws DbException; /** * Returns the number of children of the message identified by the given @@ -216,7 +219,8 @@ interface Database<T> { * <p> * Locking: contacts read, messages read, messageStatuses read. */ - Iterable<MessageId> getSendableMessages(T txn, ContactId c, long capacity) throws DbException; + Iterable<MessageId> getSendableMessages(T txn, ContactId c, long capacity) + throws DbException; /** * Returns the groups to which the user subscribes. @@ -225,6 +229,13 @@ interface Database<T> { */ Set<GroupId> getSubscriptions(T txn) throws DbException; + /** + * Returns the groups to which the given contact subscribes. + * <p> + * Locking: contacts read, subscriptions read. + */ + Set<GroupId> getSubscriptions(T txn, ContactId c) throws DbException; + /** * Returns the local transport details. * <p> @@ -308,15 +319,17 @@ interface Database<T> { * <p> * Locking: contacts read, messages read, messageStatuses write. */ - void setStatus(T txn, ContactId c, MessageId m, Status s) throws DbException; + void setStatus(T txn, ContactId c, MessageId m, Status s) + throws DbException; /** * Sets the subscriptions for the given contact, replacing any existing - * subscriptions. + * subscriptions unless the existing subscriptions have a newer timestamp. * <p> - * Locking: contacts read, subscriptions write. + * Locking: contacts write, subscriptions write. */ - void setSubscriptions(T txn, ContactId c, Set<GroupId> subs) throws DbException; + void setSubscriptions(T txn, ContactId c, Set<GroupId> subs, long timestamp) + throws DbException; /** * Sets the local transport details, replacing any existing transport @@ -324,13 +337,15 @@ interface Database<T> { * <p> * Locking: transports write. */ - void setTransports(T txn, Map<String, String> transports) throws DbException; + void setTransports(T txn, Map<String, String> transports) + throws DbException; /** * Sets the transport details for the given contact, replacing any existing - * transport details. + * transport details unless the existing details have a newer timestamp. * <p> - * Locking: contacts read, transports write. + * Locking: contacts write, transports write. */ - void setTransports(T txn, ContactId c, Map<String, String> transports) throws DbException; + void setTransports(T txn, ContactId c, Map<String, String> transports, + long timestamp) throws DbException; } diff --git a/components/net/sf/briar/db/DatabaseModule.java b/components/net/sf/briar/db/DatabaseModule.java index 1150323bec516eac4b7d8aa371629a94b3754628..75afcd0c6c5812f5a504930893096c55e59b6a7c 100644 --- a/components/net/sf/briar/db/DatabaseModule.java +++ b/components/net/sf/briar/db/DatabaseModule.java @@ -12,8 +12,10 @@ public class DatabaseModule extends AbstractModule { @Override protected void configure() { bind(Database.class).to(H2Database.class); - bind(DatabaseComponent.class).to(ReadWriteLockDatabaseComponent.class).in(Singleton.class); - bind(Password.class).annotatedWith(DatabasePassword.class).toInstance(new Password() { + bind(DatabaseComponent.class).to( + ReadWriteLockDatabaseComponent.class).in(Singleton.class); + bind(Password.class).annotatedWith(DatabasePassword.class).toInstance( + new Password() { public char[] getPassword() { return "fixme fixme".toCharArray(); } diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index 8e2fbe69ee441a1e47c7421f25a467aa57a05ee8..4e648b351874d3c1355a1f7230b8ce72d7cdbc05 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -38,15 +38,15 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_LOCAL_SUBSCRIPTIONS = "CREATE TABLE localSubscriptions" - + " (groupId XXXX NOT NULL," + + " (groupId HASH NOT NULL," + " PRIMARY KEY (groupId))"; private static final String CREATE_MESSAGES = "CREATE TABLE messages" - + " (messageId XXXX NOT NULL," - + " parentId XXXX NOT NULL," - + " groupId XXXX NOT NULL," - + " authorId XXXX NOT NULL," + + " (messageId HASH NOT NULL," + + " parentId HASH NOT NULL," + + " groupId HASH NOT NULL," + + " authorId HASH NOT NULL," + " timestamp BIGINT NOT NULL," + " size INT NOT NULL," + " raw BLOB NOT NULL," @@ -70,11 +70,13 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_CONTACTS = "CREATE TABLE contacts" + " (contactId INT NOT NULL," + + " subscriptionsTimestamp TIMESTAMP NOT NULL," + + " transportsTimestamp TIMESTAMP NOT NULL," + " PRIMARY KEY (contactId))"; private static final String CREATE_BATCHES_TO_ACK = "CREATE TABLE batchesToAck" - + " (batchId XXXX NOT NULL," + + " (batchId HASH NOT NULL," + " contactId INT NOT NULL," + " PRIMARY KEY (batchId)," + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" @@ -83,16 +85,16 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_CONTACT_SUBSCRIPTIONS = "CREATE TABLE contactSubscriptions" + " (contactId INT NOT NULL," - + " groupId XXXX NOT NULL," + + " groupId HASH NOT NULL," + " PRIMARY KEY (contactId, groupId)," + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; private static final String CREATE_OUTSTANDING_BATCHES = "CREATE TABLE outstandingBatches" - + " (batchId XXXX NOT NULL," + + " (batchId HASH NOT NULL," + " contactId INT NOT NULL," - + " timestamp YYYY NOT NULL," + + " timestamp TIMESTAMP NOT NULL," + " passover INT NOT NULL," + " PRIMARY KEY (batchId)," + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" @@ -100,9 +102,9 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_OUTSTANDING_MESSAGES = "CREATE TABLE outstandingMessages" - + " (batchId XXXX NOT NULL," + + " (batchId HASH NOT NULL," + " contactId INT NOT NULL," - + " messageId XXXX NOT NULL," + + " messageId HASH NOT NULL," + " PRIMARY KEY (batchId, messageId)," + " FOREIGN KEY (batchId) REFERENCES outstandingBatches (batchId)" + " ON DELETE CASCADE," @@ -117,13 +119,13 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_RATINGS = "CREATE TABLE ratings" - + " (authorId XXXX NOT NULL," + + " (authorId HASH NOT NULL," + " rating SMALLINT NOT NULL," + " PRIMARY KEY (authorId))"; private static final String CREATE_STATUSES = "CREATE TABLE statuses" - + " (messageId XXXX NOT NULL," + + " (messageId HASH NOT NULL," + " contactId INT NOT NULL," + " status SMALLINT NOT NULL," + " PRIMARY KEY (messageId, contactId)," @@ -157,7 +159,7 @@ abstract class JdbcDatabase implements Database<Connection> { Logger.getLogger(JdbcDatabase.class.getName()); // Different database libraries use different names for certain types - private final String hashType, bigIntType; + private final String hashType, timestampType; private final LinkedList<Connection> connections = new LinkedList<Connection>(); // Locking: self @@ -166,9 +168,9 @@ abstract class JdbcDatabase implements Database<Connection> { protected abstract Connection createConnection() throws SQLException; - JdbcDatabase(String hashType, String bigIntType) { + JdbcDatabase(String hashType, String timestampType) { this.hashType = hashType; - this.bigIntType = bigIntType; + this.timestampType = timestampType; } protected void open(boolean resume, File dir, String driverClass) @@ -254,7 +256,8 @@ abstract class JdbcDatabase implements Database<Connection> { } private String insertTypeNames(String s) { - return s.replaceAll("XXXX", hashType).replaceAll("YYYY", bigIntType); + s = s.replaceAll("HASH", hashType); + return s.replaceAll("TIMESTAMP", timestampType); } private void tryToClose(Connection c) { @@ -388,9 +391,13 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); // Create a new contact row - sql = "INSERT INTO contacts (contactId) VALUES (?)"; + sql = "INSERT INTO contacts" + + " (contactId, subscriptionsTimestamp, transportsTimestamp)" + + " VALUES (?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); + ps.setLong(2, 0L); + ps.setLong(3, 0L); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); @@ -955,6 +962,29 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public Set<GroupId> getSubscriptions(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT groupId FROM contactSubscriptions" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + rs = ps.executeQuery(); + Set<GroupId> ids = new HashSet<GroupId>(); + while(rs.next()) ids.add(new GroupId(rs.getBytes(1))); + rs.close(); + ps.close(); + return ids; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + public Map<String, String> getTransports(Connection txn) throws DbException { PreparedStatement ps = null; @@ -1279,12 +1309,27 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setSubscriptions(Connection txn, ContactId c, Set<GroupId> subs) - throws DbException { + public void setSubscriptions(Connection txn, ContactId c, Set<GroupId> subs, + long timestamp) throws DbException { PreparedStatement ps = null; + ResultSet rs = null; try { + // Return if the timestamp isn't fresh + String sql = "SELECT subscriptionsTimestamp FROM contacts" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + rs = ps.executeQuery(); + boolean found = rs.next(); + assert found; + long lastTimestamp = rs.getLong(1); + boolean more = rs.next(); + assert !more; + rs.close(); + ps.close(); + if(lastTimestamp >= timestamp) return; // Delete any existing subscriptions - String sql = "DELETE FROM contactSubscriptions WHERE contactId = ?"; + sql = "DELETE FROM contactSubscriptions WHERE contactId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.executeUpdate(); @@ -1304,6 +1349,15 @@ abstract class JdbcDatabase implements Database<Connection> { assert rowsAffectedArray[i] == 1; } ps.close(); + // Update the timestamp + sql = "UPDATE contacts SET subscriptionsTimestamp = ?" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, timestamp); + ps.setInt(2, c.getInt()); + int rowsAffected = ps.executeUpdate(); + assert rowsAffected == 1; + ps.close(); } catch(SQLException e) { tryToClose(ps); tryToClose(txn); @@ -1345,11 +1399,26 @@ abstract class JdbcDatabase implements Database<Connection> { } public void setTransports(Connection txn, ContactId c, - Map<String, String> transports) throws DbException { + Map<String, String> transports, long timestamp) throws DbException { PreparedStatement ps = null; + ResultSet rs = null; try { + // Return if the timestamp isn't fresh + String sql = "SELECT transportsTimestamp FROM contacts" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + rs = ps.executeQuery(); + boolean found = rs.next(); + assert found; + long lastTimestamp = rs.getLong(1); + boolean more = rs.next(); + assert !more; + rs.close(); + ps.close(); + if(lastTimestamp >= timestamp) return; // Delete any existing transports - String sql = "DELETE FROM contactTransports WHERE contactId = ?"; + sql = "DELETE FROM contactTransports WHERE contactId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); ps.executeUpdate(); @@ -1372,6 +1441,15 @@ abstract class JdbcDatabase implements Database<Connection> { } ps.close(); } + // Update the timestamp + sql = "UPDATE contacts SET transportsTimestamp = ?" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setLong(1, timestamp); + ps.setInt(2, c.getInt()); + int rowsAffected = ps.executeUpdate(); + assert rowsAffected == 1; + ps.close(); } catch(SQLException e) { tryToClose(ps); tryToClose(txn); diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index aac17a06a768d95ebffabe59271b21259683add7..0aac5a186c03021039a4f32fa5b4ce7f47a33563 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -2,6 +2,7 @@ package net.sf.briar.db; import java.io.IOException; import java.security.GeneralSecurityException; +import java.security.SignatureException; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -192,10 +193,19 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void generateBundle(ContactId c, BundleWriter b) throws DbException, IOException, GeneralSecurityException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c); - Set<BatchId> acks; - Set<GroupId> subs; - Map<String, String> transports; - // Add acks + Set<BatchId> acks = generateAcks(c); + Set<GroupId> subs = generateSubscriptions(c); + Map<String, String> transports = generateTransports(c); + // Add the header to the bundle + b.addHeader(acks, subs, transports); + // Add as many messages as possible to the bundle + while(generateBatch(c, b)); + b.finish(); + if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated"); + System.gc(); + } + + private Set<BatchId> generateAcks(ContactId c) throws DbException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); @@ -203,10 +213,11 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { Txn txn = db.startTransaction(); try { - acks = db.removeBatchesToAck(txn, c); + Set<BatchId> acks = db.removeBatchesToAck(txn, c); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + acks.size() + " acks"); db.commitTransaction(txn); + return acks; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -217,7 +228,9 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } finally { contactLock.readLock().unlock(); } - // Add subscriptions + } + + private Set<GroupId> generateSubscriptions(ContactId c) throws DbException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); @@ -225,10 +238,11 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { Txn txn = db.startTransaction(); try { - subs = db.getSubscriptions(txn); + Set<GroupId> subs = db.getSubscriptions(txn); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + subs.size() + " subscriptions"); db.commitTransaction(txn); + return subs; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -239,7 +253,10 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } finally { contactLock.readLock().unlock(); } - // Add transport details + } + + private Map<String, String> generateTransports(ContactId c) + throws DbException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); @@ -247,10 +264,11 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { Txn txn = db.startTransaction(); try { - transports = db.getTransports(txn); + Map<String, String> transports = db.getTransports(txn); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + transports.size() + " transports"); db.commitTransaction(txn); + return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -261,17 +279,10 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } finally { contactLock.readLock().unlock(); } - // Add the header to the bundle - b.addHeader(acks, subs, transports); - // Add as many messages as possible to the bundle - while(fillBatch(c, b)); - b.finish(); - if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated"); - System.gc(); } - private boolean fillBatch(ContactId c, BundleWriter b) throws DbException, - IOException, GeneralSecurityException { + private boolean generateBatch(ContactId c, BundleWriter b) + throws DbException, IOException, GeneralSecurityException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); @@ -309,7 +320,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } catch(IOException e) { db.abortTransaction(txn); throw e; - } catch(GeneralSecurityException e) { + } catch(SignatureException e) { db.abortTransaction(txn); throw e; } @@ -435,12 +446,29 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void receiveBundle(ContactId c, BundleReader b) throws DbException, IOException, GeneralSecurityException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c); - Header h; + Header h = b.getHeader(); + receiveAcks(c, h); + receiveSubscriptions(c, h); + receiveTransports(c, h); + // Store the messages + int batches = 0; + Batch batch = null; + while((batch = b.getNextBatch()) != null) { + receiveBatch(c, batch); + batches++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + batches + " batches"); + b.finish(); + findLostBatches(c); + System.gc(); + } + + private void receiveAcks(ContactId c, Header h) throws DbException { // Mark all messages in acked batches as seen contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); - h = b.getHeader(); messageLock.readLock().lock(); try { messageStatusLock.writeLock().lock(); @@ -467,8 +495,12 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } finally { contactLock.readLock().unlock(); } + } + + private void receiveSubscriptions(ContactId c, Header h) + throws DbException { // Update the contact's subscriptions - contactLock.readLock().lock(); + contactLock.writeLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); subscriptionLock.writeLock().lock(); @@ -476,7 +508,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { Txn txn = db.startTransaction(); try { Set<GroupId> subs = h.getSubscriptions(); - db.setSubscriptions(txn, c, subs); + db.setSubscriptions(txn, c, subs, h.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + subs.size() + " subscriptions"); db.commitTransaction(txn); @@ -488,10 +520,13 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { subscriptionLock.writeLock().unlock(); } } finally { - contactLock.readLock().unlock(); + contactLock.writeLock().unlock(); } + } + + private void receiveTransports(ContactId c, Header h) throws DbException { // Update the contact's transport details - contactLock.readLock().lock(); + contactLock.writeLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); transportLock.writeLock().lock(); @@ -499,7 +534,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { Txn txn = db.startTransaction(); try { Map<String, String> transports = h.getTransports(); - db.setTransports(txn, c, transports); + db.setTransports(txn, c, transports, h.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + transports.size() + " transports"); @@ -512,23 +547,11 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { transportLock.writeLock().unlock(); } } finally { - contactLock.readLock().unlock(); - } - // Store the messages - int batches = 0; - Batch batch = null; - while((batch = b.getNextBatch()) != null) { - storeBatch(c, batch); - batches++; + contactLock.writeLock().unlock(); } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + batches + " batches"); - b.finish(); - findLostBatches(c); - System.gc(); } - private void storeBatch(ContactId c, Batch b) throws DbException { + private void receiveBatch(ContactId c, Batch b) throws DbException { waitForPermissionToWrite(); contactLock.readLock().lock(); try { @@ -687,29 +710,6 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public void setTransports(ContactId c, Map<String, String> transports) - throws DbException { - contactLock.readLock().lock(); - try { - if(!containsContact(c)) throw new NoSuchContactException(); - transportLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - db.setTransports(txn, c, transports); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - transportLock.writeLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - public void subscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); subscriptionLock.writeLock().lock(); diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index 4e0b05289e9fad92740f9017f37645ecef6e8892..93fffc9c1354cddf3032f897cf5984b1fcce8d43 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -146,68 +146,78 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void generateBundle(ContactId c, BundleWriter b) throws DbException, IOException, GeneralSecurityException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c); - Set<BatchId> acks; - Set<GroupId> subs; - Map<String, String> transports; - // Add acks + Set<BatchId> acks = generateAcks(c); + Set<GroupId> subs = generateSubscriptions(c); + Map<String, String> transports = generateTransports(c); + // Add the header to the bundle + b.addHeader(acks, subs, transports); + // Add as many messages as possible to the bundle + while(generateBatch(c, b)); + b.finish(); + if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated"); + System.gc(); + } + + private Set<BatchId> generateAcks(ContactId c) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { - acks = db.removeBatchesToAck(txn, c); + Set<BatchId> acks = db.removeBatchesToAck(txn, c); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + acks.size() + " acks"); db.commitTransaction(txn); + return acks; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } - // Add subscriptions + } + + private Set<GroupId> generateSubscriptions(ContactId c) throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { - subs = db.getSubscriptions(txn); + Set<GroupId> subs = db.getSubscriptions(txn); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + subs.size() + " subscriptions"); db.commitTransaction(txn); + return subs; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } - // Add transport details + } + + private Map<String, String> generateTransports(ContactId c) + throws DbException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(transportLock) { Txn txn = db.startTransaction(); try { - transports = db.getTransports(txn); + Map<String, String> transports = db.getTransports(txn); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + transports.size() + " transports"); db.commitTransaction(txn); + return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; } } } - // Add the header to the bundle - b.addHeader(acks, subs, transports); - // Add as many messages as possible to the bundle - while(fillBatch(c, b)); - b.finish(); - if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated"); - System.gc(); } - private boolean fillBatch(ContactId c, BundleWriter b) throws DbException, - IOException, GeneralSecurityException { + private boolean generateBatch(ContactId c, BundleWriter b) + throws DbException, IOException, GeneralSecurityException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { @@ -242,6 +252,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } catch(DbException e) { db.abortTransaction(txn); throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; } catch(SignatureException e) { db.abortTransaction(txn); throw e; @@ -327,11 +340,28 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void receiveBundle(ContactId c, BundleReader b) throws DbException, IOException, GeneralSecurityException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c); - Header h; + Header h = b.getHeader(); + receiveAcks(c, h); + receiveSubscriptions(c, h); + receiveTransports(c, h); + // Store the messages + int batches = 0; + Batch batch = null; + while((batch = b.getNextBatch()) != null) { + receiveBatch(c, batch); + batches++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + batches + " batches"); + b.finish(); + findLostBatches(c); + System.gc(); + } + + private void receiveAcks(ContactId c, Header h) throws DbException { // Mark all messages in acked batches as seen synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); - h = b.getHeader(); synchronized(messageLock) { synchronized(messageStatusLock) { Set<BatchId> acks = h.getAcks(); @@ -350,6 +380,10 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } } + } + + private void receiveSubscriptions(ContactId c, Header h) + throws DbException { // Update the contact's subscriptions synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); @@ -357,7 +391,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { Txn txn = db.startTransaction(); try { Set<GroupId> subs = h.getSubscriptions(); - db.setSubscriptions(txn, c, subs); + db.setSubscriptions(txn, c, subs, h.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + subs.size() + " subscriptions"); db.commitTransaction(txn); @@ -367,6 +401,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } } + } + + private void receiveTransports(ContactId c, Header h) throws DbException { // Update the contact's transport details synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); @@ -374,7 +411,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { Txn txn = db.startTransaction(); try { Map<String, String> transports = h.getTransports(); - db.setTransports(txn, c, transports); + db.setTransports(txn, c, transports, h.getTimestamp()); if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + transports.size() + " transports"); @@ -385,21 +422,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } } - // Store the messages - int batches = 0; - Batch batch = null; - while((batch = b.getNextBatch()) != null) { - storeBatch(c, batch); - batches++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + batches + " batches"); - b.finish(); - findLostBatches(c); - System.gc(); } - private void storeBatch(ContactId c, Batch b) throws DbException { + private void receiveBatch(ContactId c, Batch b) throws DbException { waitForPermissionToWrite(); synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); @@ -511,23 +536,6 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public void setTransports(ContactId c, Map<String, String> transports) - throws DbException { - synchronized(contactLock) { - if(!containsContact(c)) throw new NoSuchContactException(); - synchronized(transportLock) { - Txn txn = db.startTransaction(); - try { - db.setTransports(txn, c, transports); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - public void subscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); synchronized(subscriptionLock) { diff --git a/components/net/sf/briar/invitation/InvitationModule.java b/components/net/sf/briar/invitation/InvitationModule.java index 021aee31de9a4789d5dbb0cb50ccf1c4eba015e4..9124ec28dd904807da6176e902ecf2fc0795e28d 100644 --- a/components/net/sf/briar/invitation/InvitationModule.java +++ b/components/net/sf/briar/invitation/InvitationModule.java @@ -8,6 +8,7 @@ public class InvitationModule extends AbstractModule { @Override protected void configure() { - bind(InvitationWorkerFactory.class).to(InvitationWorkerFactoryImpl.class); + bind(InvitationWorkerFactory.class).to( + InvitationWorkerFactoryImpl.class); } } diff --git a/test/net/sf/briar/db/DatabaseComponentTest.java b/test/net/sf/briar/db/DatabaseComponentTest.java index 15d8e5596fe2b4bfcac374899b4580353a1b389f..0ae68d759a3c1a04e49da2aea2ca77a3e27e0b96 100644 --- a/test/net/sf/briar/db/DatabaseComponentTest.java +++ b/test/net/sf/briar/db/DatabaseComponentTest.java @@ -71,8 +71,6 @@ public abstract class DatabaseComponentTest extends TestCase { @Test public void testSimpleCalls() throws DbException { - final Map<String, String> transports1 = - Collections.singletonMap("foo", "bar baz"); Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); @@ -98,10 +96,6 @@ public abstract class DatabaseComponentTest extends TestCase { will(returnValue(true)); oneOf(database).getTransports(txn, contactId); will(returnValue(transports)); - // setTransports(contactId, transports1) - oneOf(database).containsContact(txn, contactId); - will(returnValue(true)); - oneOf(database).setTransports(txn, contactId, transports1); // subscribe(groupId) oneOf(database).addSubscription(txn, groupId); // getSubscriptions() @@ -122,7 +116,6 @@ public abstract class DatabaseComponentTest extends TestCase { assertEquals(contactId, db.addContact(transports)); assertEquals(contacts, db.getContacts()); assertEquals(transports, db.getTransports(contactId)); - db.setTransports(contactId, transports1); db.subscribe(groupId); assertEquals(subs, db.getSubscriptions()); db.unsubscribe(groupId); @@ -509,7 +502,10 @@ public abstract class DatabaseComponentTest extends TestCase { final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); final BundleReader bundleReader = context.mock(BundleReader.class); + final Header header = context.mock(Header.class); context.checking(new Expectations() {{ + oneOf(bundleReader).getHeader(); + will(returnValue(header)); // Check that the contact is still in the DB oneOf(database).startTransaction(); will(returnValue(txn)); @@ -552,11 +548,16 @@ public abstract class DatabaseComponentTest extends TestCase { // Subscriptions oneOf(header).getSubscriptions(); will(returnValue(subs)); - oneOf(database).setSubscriptions(txn, contactId, subs); + oneOf(header).getTimestamp(); + will(returnValue(timestamp)); + oneOf(database).setSubscriptions(txn, contactId, subs, timestamp); // Transports oneOf(header).getTransports(); will(returnValue(transports)); - oneOf(database).setTransports(txn, contactId, transports); + oneOf(header).getTimestamp(); + will(returnValue(timestamp)); + oneOf(database).setTransports(txn, contactId, transports, + timestamp); // Batches oneOf(bundleReader).getNextBatch(); will(returnValue(batch)); diff --git a/test/net/sf/briar/db/H2DatabaseTest.java b/test/net/sf/briar/db/H2DatabaseTest.java index 337e542a5b85255f1e0030cab74957c4ab8a459e..a46755d92b24eaa680b2b21082a01b6844ff8ba4 100644 --- a/test/net/sf/briar/db/H2DatabaseTest.java +++ b/test/net/sf/briar/db/H2DatabaseTest.java @@ -195,7 +195,7 @@ public class H2DatabaseTest extends TestCase { Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); - db.setSubscriptions(txn, contactId, Collections.singleton(groupId)); + db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setStatus(txn, contactId, messageId, Status.NEW); db.commitTransaction(txn); @@ -234,7 +234,7 @@ public class H2DatabaseTest extends TestCase { Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); - db.setSubscriptions(txn, contactId, Collections.singleton(groupId)); + db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.commitTransaction(txn); @@ -293,7 +293,7 @@ public class H2DatabaseTest extends TestCase { // The contact subscribing should make the message sendable txn = db.startTransaction(); - db.setSubscriptions(txn, contactId, Collections.singleton(groupId)); + db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); @@ -301,7 +301,7 @@ public class H2DatabaseTest extends TestCase { // The contact unsubscribing should make the message unsendable txn = db.startTransaction(); - db.setSubscriptions(txn, contactId, Collections.<GroupId>emptySet()); + db.setSubscriptions(txn, contactId, Collections.<GroupId>emptySet(), 2); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); db.commitTransaction(txn); @@ -317,7 +317,7 @@ public class H2DatabaseTest extends TestCase { Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); - db.setSubscriptions(txn, contactId, Collections.singleton(groupId)); + db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); @@ -377,7 +377,7 @@ public class H2DatabaseTest extends TestCase { Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); - db.setSubscriptions(txn, contactId, Collections.singleton(groupId)); + db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); @@ -417,7 +417,7 @@ public class H2DatabaseTest extends TestCase { Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); - db.setSubscriptions(txn, contactId, Collections.singleton(groupId)); + db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); @@ -740,10 +740,10 @@ public class H2DatabaseTest extends TestCase { transports = new TreeMap<String, String>(); transports.put("foo", "bar baz"); transports.put("bar", "baz quux"); - db.setTransports(txn, contactId, transports); + db.setTransports(txn, contactId, transports, 1); assertEquals(transports, db.getTransports(txn, contactId)); // Remove the transport details - db.setTransports(txn, contactId, null); + db.setTransports(txn, contactId, null, 2); assertEquals(Collections.emptyMap(), db.getTransports(txn, contactId)); // Set the local transport details db.setTransports(txn, transports); @@ -756,6 +756,85 @@ public class H2DatabaseTest extends TestCase { db.close(); } + @Test + public void testTransportsNotUpdatedIfTimestampIsOld() throws DbException { + Database<Connection> db = open(false); + + // Add a contact with some transport details + Connection txn = db.startTransaction(); + Map<String, String> transports = Collections.singletonMap("foo", "bar"); + assertEquals(contactId, db.addContact(txn, transports)); + assertEquals(transports, db.getTransports(txn, contactId)); + // Replace the transport details using a timestamp of 2 + Map<String, String> transports1 = new TreeMap<String, String>(); + transports1.put("foo", "bar baz"); + transports1.put("bar", "baz quux"); + db.setTransports(txn, contactId, transports1, 2); + assertEquals(transports1, db.getTransports(txn, contactId)); + // Try to replace the transport details using a timestamp of 1 + Map<String, String> transports2 = new TreeMap<String, String>(); + transports2.put("bar", "baz"); + transports2.put("quux", "fnord"); + db.setTransports(txn, contactId, transports2, 1); + // The old transports should still be there + assertEquals(transports1, db.getTransports(txn, contactId)); + db.commitTransaction(txn); + + db.close(); + } + + @Test + public void testUpdateSubscriptions() throws DbException { + Database<Connection> db = open(false); + + // Add a contact + Connection txn = db.startTransaction(); + Map<String, String> transports = Collections.emptyMap(); + assertEquals(contactId, db.addContact(txn, transports)); + // Add some subscriptions + Set<GroupId> subs = new HashSet<GroupId>(); + subs.add(new GroupId(TestUtils.getRandomId())); + subs.add(new GroupId(TestUtils.getRandomId())); + db.setSubscriptions(txn, contactId, subs, 1); + assertEquals(subs, db.getSubscriptions(txn, contactId)); + // Update the subscriptions + Set<GroupId> subs1 = new HashSet<GroupId>(); + subs1.add(new GroupId(TestUtils.getRandomId())); + subs1.add(new GroupId(TestUtils.getRandomId())); + db.setSubscriptions(txn, contactId, subs1, 2); + assertEquals(subs1, db.getSubscriptions(txn, contactId)); + db.commitTransaction(txn); + + db.close(); + } + + @Test + public void testSubscriptionsNotUpdatedIfTimestampIsOld() + throws DbException { + Database<Connection> db = open(false); + + // Add a contact + Connection txn = db.startTransaction(); + Map<String, String> transports = Collections.emptyMap(); + assertEquals(contactId, db.addContact(txn, transports)); + // Add some subscriptions + Set<GroupId> subs = new HashSet<GroupId>(); + subs.add(new GroupId(TestUtils.getRandomId())); + subs.add(new GroupId(TestUtils.getRandomId())); + db.setSubscriptions(txn, contactId, subs, 2); + assertEquals(subs, db.getSubscriptions(txn, contactId)); + // Try to update the subscriptions using a timestamp of 1 + Set<GroupId> subs1 = new HashSet<GroupId>(); + subs1.add(new GroupId(TestUtils.getRandomId())); + subs1.add(new GroupId(TestUtils.getRandomId())); + db.setSubscriptions(txn, contactId, subs1, 1); + // The old subscriptions should still be there + assertEquals(subs, db.getSubscriptions(txn, contactId)); + db.commitTransaction(txn); + + db.close(); + } + private Database<Connection> open(boolean resume) throws DbException { final char[] passwordArray = passwordString.toCharArray(); Mockery context = new Mockery();