diff --git a/api/net/sf/briar/api/db/DatabaseComponent.java b/api/net/sf/briar/api/db/DatabaseComponent.java index 01f449af9cb35822b7303c714f739528a5de9bbd..b710127813cfdb973e1eccf74f6abf2f7f45d327 100644 --- a/api/net/sf/briar/api/db/DatabaseComponent.java +++ b/api/net/sf/briar/api/db/DatabaseComponent.java @@ -50,6 +50,9 @@ public interface DatabaseComponent { */ void receiveBundle(NeighbourId n, Bundle b) throws DbException; + /** Removes a neighbour (and all associated state) from the database. */ + void removeNeighbour(NeighbourId n) throws DbException; + /** Records the user's rating for the given author. */ void setRating(AuthorId a, Rating r) throws DbException; diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index 6d6f1bad44a1b19ea1e479e72181a272261b7277..b00fa2ee062146d0a3f077ab089a8dd085e6ad69 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -19,6 +19,16 @@ import net.sf.briar.api.protocol.MessageId; * obtained by calling startTransaction(). Every transaction must be * terminated by calling either abortTransaction() or commitTransaction(), * even if an exception is thrown. + * + * Locking is provided by the DatabaseComponent implementation. To prevent + * deadlock, locks must be acquired in the following order: + * <ul> + * <li> contacts + * <li> messages + * <li> messageStatuses + * <li> ratings + * <li> subscriptions + * </ul> */ interface Database<T> { @@ -49,63 +59,80 @@ interface Database<T> { /** * Records a received batch as needing to be acknowledged. - * Locking: neighbours write. + * <p> + * Locking: contacts read, messageStatuses write. */ void addBatchToAck(T txn, NeighbourId n, BatchId b) throws DbException; /** * Returns false if the given message is already in the database. Otherwise * stores the message and returns true. + * <p> * Locking: messages write. */ boolean addMessage(T txn, Message m) throws DbException; /** * Adds a new neighbour to the database. - * Locking: neighbours write. + * <p> + * Locking: contacts write, messageStatuses write. */ void addNeighbour(T txn, NeighbourId n) throws DbException; /** * Records a sent batch as needing to be acknowledged. - * Locking: neighbours write, messages read. + * <p> + * Locking: contacts read, messages read, messageStatuses write. */ void addOutstandingBatch(T txn, NeighbourId n, BatchId b, Set<MessageId> sent) throws DbException; /** * Records a received bundle. This should be called after processing the * bundle's contents, and may result in outstanding messages becoming - * eligible for retransmittion. - * Locking: neighbours write, messages read. + * eligible for retransmission. + * <p> + * Locking: contacts read, messages read, messageStatuses write. */ Set<BatchId> addReceivedBundle(T txn, NeighbourId n, BundleId b) throws DbException; /** * Subscribes to the given group. + * <p> * Locking: subscriptions write. */ void addSubscription(T txn, GroupId g) throws DbException; /** * Records a neighbour's subscription to a group. - * Locking: neighbours write. + * <p> + * Locking: contacts read, messageStatuses write. */ void addSubscription(T txn, NeighbourId n, GroupId g) throws DbException; /** * Removes all recorded subscriptions for the given neighbour. - * Locking: neighbours write. + * <p> + * Locking: contacts read, messageStatuses write. */ void clearSubscriptions(T txn, NeighbourId n) throws DbException; /** * Returns true iff the database contains the given message. + * <p> * Locking: messages read. */ boolean containsMessage(T txn, MessageId m) throws DbException; + /** + * Returns true iff the database contains the given neighbour. + * <p> + * Locking: contacts read. + */ + boolean containsNeighbour(T txn, NeighbourId n) throws DbException; + /** * Returns true iff the user is subscribed to the given group. + * <p> * Locking: subscriptions read. */ boolean containsSubscription(T txn, GroupId g) throws DbException; @@ -114,18 +141,21 @@ interface Database<T> { * Returns the amount of free storage space available to the database, in * bytes. This is based on the minimum of the space available on the device * where the database is stored and the database's configured size. + * <p> * Locking: messages read. */ long getFreeSpace() throws DbException; /** * Returns the message identified by the given ID. + * <p> * Locking: messages read. */ Message getMessage(T txn, MessageId m) throws DbException; /** * Returns the IDs of all messages signed by the given author. + * <p> * Locking: messages read. */ Iterable<MessageId> getMessagesByAuthor(T txn, AuthorId a) throws DbException; @@ -133,31 +163,36 @@ interface Database<T> { /** * Returns the IDs of all children of the message identified by the given * ID that are present in the database. + * <p> * Locking: messages read. */ Iterable<MessageId> getMessagesByParent(T txn, MessageId m) throws DbException; /** - * Returns the IDs of all neighbours - * Locking: neighbours read. + * Returns the IDs of all neighbours. + * <p> + * Locking: contacts read, messageStatuses read. */ Set<NeighbourId> getNeighbours(T txn) throws DbException; /** * Returns the IDs of the oldest messages in the database, with a total * size less than or equal to the given size. + * <p> * Locking: messages read. */ Iterable<MessageId> getOldMessages(T txn, long size) throws DbException; /** * Returns the parent of the given message. + * <p> * Locking: messages read. */ MessageId getParent(T txn, MessageId m) throws DbException; /** * Returns the user's rating for the given author. + * <p> * Locking: ratings read. */ Rating getRating(T txn, AuthorId a) throws DbException; @@ -166,6 +201,7 @@ interface Database<T> { * Returns the sendability score of the given message. Messages with * sendability scores greater than zero are eligible to be sent to * neighbours. + * <p> * Locking: messages read. */ int getSendability(T txn, MessageId m) throws DbException; @@ -173,12 +209,14 @@ interface Database<T> { /** * Returns the IDs of some messages that are eligible to be sent to the * given neighbour, with a total size less than or equal to the given size. - * Locking: neighbours read, messages read. + * <p> + * Locking: contacts read, messages read, messageStatuses read. */ Iterable<MessageId> getSendableMessages(T txn, NeighbourId n, long capacity) throws DbException; /** * Returns the groups to which the user subscribes. + * <p> * Locking: subscriptions read. */ Set<GroupId> getSubscriptions(T txn) throws DbException; @@ -187,14 +225,16 @@ interface Database<T> { * Removes an outstanding batch that has been acknowledged. Any messages in * the batch that are still considered outstanding (Status.SENT) with * respect to the given neighbour are now considered seen (Status.SEEN). - * Locking: neighbours write, messages read. + * <p> + * Locking: contacts read, messages read, messageStatuses write. */ void removeAckedBatch(T txn, NeighbourId n, BatchId b) throws DbException; /** * Removes and returns the IDs of any batches received from the given * neighbour that need to be acknowledged. - * Locking: neighbours write. + * <p> + * Locking: contacts read, messageStatuses write. */ Set<BatchId> removeBatchesToAck(T txn, NeighbourId n) throws DbException; @@ -202,38 +242,52 @@ interface Database<T> { * Removes an outstanding batch that has been lost. Any messages in the * batch that are still considered outstanding (Status.SENT) with respect * to the given neighbour are now considered unsent (Status.NEW). - * Locking: neighbours write, messages read. + * <p> + * Locking: contacts read, messages read, messageStatuses write. */ void removeLostBatch(T txn, NeighbourId n, BatchId b) throws DbException; /** - * Removes a message from the database. - * Locking: neighbours write, messages write. + * Removes a message (and all associated state) from the database. + * <p> + * Locking: contacts read, messages write, messageStatuses write. */ void removeMessage(T txn, MessageId m) throws DbException; + /** + * Removes a neighbour (and all associated state) from the database. + * <p> + * Locking: contacts write, messageStatuses write. + */ + void removeNeighbour(T txn, NeighbourId n) throws DbException; + /** * Unsubscribes from the given group. Any messages belonging to the group * are deleted from the database. - * Locking: subscriptions write, neighbours write, messages write. + * <p> + * Locking: contacts read, subscriptions write, messages write, + * messageStatuses write. */ void removeSubscription(T txn, GroupId g) throws DbException; /** * Records the user's rating for the given author. + * <p> * Locking: ratings write. */ Rating setRating(T txn, AuthorId a, Rating r) throws DbException; /** * Records the sendability score of the given message. + * <p> * Locking: messages write. */ void setSendability(T txn, MessageId m, int sendability) throws DbException; /** - * Sets the status of the given message with respect to the given neighbour. - * Locking: neighbours write, messages read + * Sets the status of the given message with respect to the given neighbour. + * <p> + * Locking: contacts read, messages read, messageStatuses write. */ void setStatus(T txn, NeighbourId n, MessageId m, Status s) throws DbException; } diff --git a/components/net/sf/briar/db/DatabaseComponentImpl.java b/components/net/sf/briar/db/DatabaseComponentImpl.java index db7dcc9da5eded385fd74501eb3b600fb21a7341..638eecd3892b618a92b17ea5ecc1d41d1207a2c8 100644 --- a/components/net/sf/briar/db/DatabaseComponentImpl.java +++ b/components/net/sf/briar/db/DatabaseComponentImpl.java @@ -74,7 +74,20 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent { } } - // Locking: messages write, neighbours write + // Locking: contacts read + protected boolean containsNeighbour(NeighbourId n) throws DbException { + Txn txn = db.startTransaction(); + try { + boolean contains = db.containsNeighbour(txn, n); + db.commitTransaction(txn); + return contains; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + + // Locking: contacts read, messages write, messageStatuses write protected void removeMessage(Txn txn, MessageId id) throws DbException { Integer sendability = db.getSendability(txn, id); assert sendability != null; @@ -127,7 +140,7 @@ abstract class DatabaseComponentImpl<Txn> implements DatabaseComponent { new Thread(cleaner).start(); } - // Locking: messages write, neighbours write + // Locking: contacts read, messages write, messageStatuses write protected boolean storeMessage(Txn txn, Message m, NeighbourId sender) throws DbException { boolean added = db.addMessage(txn, m); diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index e17b60389ad00ba6d92ca0ba066ad0dbf0a66537..173d0ac7b44199e2b2a322f123c7745b1f9585dc 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -650,6 +650,33 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public boolean containsNeighbour(Connection txn, NeighbourId n) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT COUNT(neighbourId) FROM neighbours" + + " WHERE neighbourId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, n.getInt()); + rs = ps.executeQuery(); + boolean found = rs.next(); + assert found; + int count = rs.getInt(1); + assert count <= 1; + boolean more = rs.next(); + assert !more; + rs.close(); + ps.close(); + return count > 0; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + public boolean containsSubscription(Connection txn, GroupId g) throws DbException { PreparedStatement ps = null; @@ -1070,6 +1097,23 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public void removeNeighbour(Connection txn, NeighbourId n) + throws DbException { + PreparedStatement ps = null; + try { + String sql = "DELETE FROM neighbours WHERE neighbourId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, n.getInt()); + int rowsAffected = ps.executeUpdate(); + assert rowsAffected == 1; + ps.close(); + } catch(SQLException e) { + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + public void removeSubscription(Connection txn, GroupId g) throws DbException { PreparedStatement ps = null; diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index 4e1680b539ed8fd35a5ed32312277d75cc8ee94a..dc75fe9e00052ba4177615797355c0ba84810b13 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -32,9 +32,11 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { * implementation can allow writers to starve. */ + private final ReentrantReadWriteLock contactLock = + new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock messageLock = new ReentrantReadWriteLock(true); - private final ReentrantReadWriteLock neighbourLock = + private final ReentrantReadWriteLock messageStatusLock = new ReentrantReadWriteLock(true); private final ReentrantReadWriteLock ratingLock = new ReentrantReadWriteLock(true); @@ -48,99 +50,119 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } protected void expireMessages(long size) throws DbException { - messageLock.writeLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + messageLock.writeLock().lock(); try { - Txn txn = db.startTransaction(); + messageStatusLock.writeLock().lock(); try { - for(MessageId m : db.getOldMessages(txn, size)) { - removeMessage(txn, m); + Txn txn = db.startTransaction(); + try { + for(MessageId m : db.getOldMessages(txn, size)) { + removeMessage(txn, m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + contactLock.readLock().unlock(); } } public void close() throws DbException { - messageLock.writeLock().lock(); + contactLock.writeLock().lock(); try { - neighbourLock.writeLock().lock(); + messageLock.writeLock().lock(); try { - ratingLock.writeLock().lock(); + messageStatusLock.writeLock().lock(); try { - subscriptionLock.writeLock().lock(); + ratingLock.writeLock().lock(); try { - db.close(); + subscriptionLock.writeLock().lock(); + try { + db.close(); + } finally { + subscriptionLock.writeLock().unlock(); + } } finally { - subscriptionLock.writeLock().unlock(); + ratingLock.writeLock().unlock(); } } finally { - ratingLock.writeLock().unlock(); + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + contactLock.writeLock().unlock(); } } public void addNeighbour(NeighbourId n) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n); - neighbourLock.writeLock().lock(); + contactLock.writeLock().lock(); try { - Txn txn = db.startTransaction(); + messageStatusLock.writeLock().lock(); try { - db.addNeighbour(txn, n); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + Txn txn = db.startTransaction(); + try { + db.addNeighbour(txn, n); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + contactLock.writeLock().unlock(); } } public void addLocallyGeneratedMessage(Message m) throws DbException { waitForPermissionToWrite(); - messageLock.writeLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + messageLock.writeLock().lock(); try { - subscriptionLock.readLock().lock(); + messageStatusLock.writeLock().lock(); try { - Txn txn = db.startTransaction(); + subscriptionLock.readLock().lock(); try { - if(db.containsSubscription(txn, m.getGroup())) { - boolean added = storeMessage(txn, m, null); - assert added; - } else { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Not subscribed"); + Txn txn = db.startTransaction(); + try { + if(db.containsSubscription(txn, m.getGroup())) { + boolean added = storeMessage(txn, m, null); + assert added; + } else { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Not subscribed"); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + subscriptionLock.readLock().unlock(); } } finally { - subscriptionLock.readLock().unlock(); + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + contactLock.readLock().unlock(); } } @@ -161,6 +183,28 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } + public void removeNeighbour(NeighbourId n) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n); + contactLock.writeLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + db.removeNeighbour(txn, n); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + contactLock.writeLock().unlock(); + } + } + public void setRating(AuthorId a, Rating r) throws DbException { messageLock.writeLock().lock(); try { @@ -224,72 +268,89 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void unsubscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g); - messageLock.writeLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + messageLock.writeLock().lock(); try { - subscriptionLock.writeLock().lock(); + messageStatusLock.writeLock().lock(); try { - Txn txn = db.startTransaction(); + subscriptionLock.writeLock().lock(); try { - db.removeSubscription(txn, g); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + Txn txn = db.startTransaction(); + try { + db.removeSubscription(txn, g); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + subscriptionLock.writeLock().unlock(); } } finally { - subscriptionLock.writeLock().unlock(); + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + contactLock.readLock().unlock(); } } public void generateBundle(NeighbourId n, Bundle b) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n); // Ack all batches received from the neighbour - neighbourLock.writeLock().lock(); + contactLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + if(!containsNeighbour(n)) return; + messageStatusLock.writeLock().lock(); try { - int numAcks = 0; - for(BatchId ack : db.removeBatchesToAck(txn, n)) { - b.addAck(ack); - numAcks++; + Txn txn = db.startTransaction(); + try { + int numAcks = 0; + for(BatchId ack : db.removeBatchesToAck(txn, n)) { + b.addAck(ack); + numAcks++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numAcks + " acks"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numAcks + " acks"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + contactLock.readLock().unlock(); } // Add a list of subscriptions - subscriptionLock.readLock().lock(); + contactLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + if(!containsNeighbour(n)) return; + subscriptionLock.readLock().lock(); try { - int numSubs = 0; - for(GroupId g : db.getSubscriptions(txn)) { - b.addSubscription(g); - numSubs++; + Txn txn = db.startTransaction(); + try { + int numSubs = 0; + for(GroupId g : db.getSubscriptions(txn)) { + b.addSubscription(g); + numSubs++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numSubs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numSubs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + subscriptionLock.readLock().unlock(); } } finally { - subscriptionLock.readLock().unlock(); + contactLock.readLock().unlock(); } // Add as many messages as possible to the bundle long capacity = b.getCapacity(); @@ -309,55 +370,61 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } private Batch fillBatch(NeighbourId n, long capacity) throws DbException { - messageLock.readLock().lock(); + contactLock.readLock().lock(); try { - Set<MessageId> sent; - Batch b; - neighbourLock.readLock().lock(); + if(!containsNeighbour(n)) return null; + messageLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + Set<MessageId> sent; + Batch b; + messageStatusLock.readLock().lock(); try { - capacity = Math.min(capacity, Batch.CAPACITY); - Iterator<MessageId> it = - db.getSendableMessages(txn, n, capacity).iterator(); - if(!it.hasNext()) { + Txn txn = db.startTransaction(); + try { + capacity = Math.min(capacity, Batch.CAPACITY); + Iterator<MessageId> it = + db.getSendableMessages(txn, n, capacity).iterator(); + if(!it.hasNext()) { + db.commitTransaction(txn); + return null; // No more messages to send + } + sent = new HashSet<MessageId>(); + b = batchProvider.get(); + while(it.hasNext()) { + MessageId m = it.next(); + b.addMessage(db.getMessage(txn, m)); + sent.add(m); + } + b.seal(); db.commitTransaction(txn); - return null; // No more messages to send - } - sent = new HashSet<MessageId>(); - b = batchProvider.get(); - while(it.hasNext()) { - MessageId m = it.next(); - b.addMessage(db.getMessage(txn, m)); - sent.add(m); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - b.seal(); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + messageStatusLock.readLock().unlock(); } - } finally { - neighbourLock.readLock().unlock(); - } - // Record the contents of the batch - neighbourLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); + // Record the contents of the batch + messageStatusLock.writeLock().lock(); try { - assert !sent.isEmpty(); - db.addOutstandingBatch(txn, n, b.getId(), sent); - db.commitTransaction(txn); - return b; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + Txn txn = db.startTransaction(); + try { + assert !sent.isEmpty(); + db.addOutstandingBatch(txn, n, b.getId(), sent); + db.commitTransaction(txn); + return b; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + contactLock.readLock().unlock(); } } @@ -366,132 +433,163 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { LOG.fine("Received bundle from " + n + ", " + b.getSize() + " bytes"); // Mark all messages in acked batches as seen - messageLock.readLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + if(!containsNeighbour(n)) return; + messageLock.readLock().lock(); try { - int acks = 0; - for(BatchId ack : b.getAcks()) { - acks++; - Txn txn = db.startTransaction(); - try { - db.removeAckedBatch(txn, n, ack); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + messageStatusLock.writeLock().lock(); + try { + int acks = 0; + for(BatchId ack : b.getAcks()) { + acks++; + Txn txn = db.startTransaction(); + try { + db.removeAckedBatch(txn, n, ack); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + acks + " acks"); + } finally { + messageStatusLock.writeLock().unlock(); } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + acks + " acks"); } finally { - neighbourLock.writeLock().unlock(); + messageLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + contactLock.readLock().unlock(); } // Update the neighbour's subscriptions - neighbourLock.writeLock().lock(); + contactLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + if(!containsNeighbour(n)) return; + messageStatusLock.writeLock().lock(); try { - db.clearSubscriptions(txn, n); - int subs = 0; - for(GroupId g : b.getSubscriptions()) { - subs++; - db.addSubscription(txn, n, g); + Txn txn = db.startTransaction(); + try { + db.clearSubscriptions(txn, n); + int subs = 0; + for(GroupId g : b.getSubscriptions()) { + subs++; + db.addSubscription(txn, n, g); + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + subs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + subs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + contactLock.readLock().lock(); } // Store the messages int batches = 0; for(Batch batch : b.getBatches()) { batches++; waitForPermissionToWrite(); - messageLock.writeLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + if(!containsNeighbour(n)) return; + messageLock.writeLock().lock(); try { - subscriptionLock.readLock().lock(); + messageStatusLock.writeLock().lock(); try { - Txn txn = db.startTransaction(); + subscriptionLock.readLock().lock(); try { - int received = 0, stored = 0; - for(Message m : batch.getMessages()) { - received++; - if(db.containsSubscription(txn, m.getGroup())) { - if(storeMessage(txn, m, n)) stored++; + Txn txn = db.startTransaction(); + try { + int received = 0, stored = 0; + for(Message m : batch.getMessages()) { + received++; + GroupId g = m.getGroup(); + if(db.containsSubscription(txn, g)) { + if(storeMessage(txn, m, n)) stored++; + } } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + received + + " messages, stored " + stored); + db.addBatchToAck(txn, n, batch.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + received - + " messages, stored " + stored); - db.addBatchToAck(txn, n, batch.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + } finally { + subscriptionLock.readLock().unlock(); } } finally { - subscriptionLock.readLock().unlock(); + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + contactLock.readLock().unlock(); } } if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + batches + " batches"); // Find any lost batches that need to be retransmitted Set<BatchId> lost; - messageLock.readLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + if(!containsNeighbour(n)) return; + messageLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + messageStatusLock.writeLock().lock(); try { - lost = db.addReceivedBundle(txn, n, b.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + Txn txn = db.startTransaction(); + try { + lost = db.addReceivedBundle(txn, n, b.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + contactLock.readLock().unlock(); } for(BatchId batch : lost) { - messageLock.readLock().lock(); + contactLock.readLock().lock(); try { - neighbourLock.writeLock().lock(); + if(!containsNeighbour(n)) return; + messageLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + messageStatusLock.writeLock().lock(); try { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Removing lost batch"); - db.removeLostBatch(txn, n, batch); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + Txn txn = db.startTransaction(); + try { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Removing lost batch"); + db.removeLostBatch(txn, n, batch); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); } } finally { - neighbourLock.writeLock().unlock(); + messageLock.readLock().unlock(); } } finally { - messageLock.readLock().unlock(); + contactLock.readLock().unlock(); } } System.gc(); diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index 90cd27beea2cbcac887133d762d1a3aaf207bf89..1008156d17883b44a11f4aba35fac877da39e699 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -30,8 +30,9 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { * interface to find out which calls require which locks. */ + private final Object contactLock = new Object(); private final Object messageLock = new Object(); - private final Object neighbourLock = new Object(); + private final Object messageStatusLock = new Object(); private final Object ratingLock = new Object(); private final Object subscriptionLock = new Object(); @@ -42,28 +43,32 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } protected void expireMessages(long size) throws DbException { - synchronized(messageLock) { - synchronized(neighbourLock) { - Txn txn = db.startTransaction(); - try { - for(MessageId m : db.getOldMessages(txn, size)) { - removeMessage(txn, m); + synchronized(contactLock) { + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + for(MessageId m : db.getOldMessages(txn, size)) { + removeMessage(txn, m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; } } } } public void close() throws DbException { - synchronized(messageLock) { - synchronized(neighbourLock) { - synchronized(ratingLock) { - synchronized(subscriptionLock) { - db.close(); + synchronized(contactLock) { + synchronized(messageLock) { + synchronized(messageStatusLock) { + synchronized(ratingLock) { + synchronized(subscriptionLock) { + db.close(); + } } } } @@ -72,36 +77,40 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void addNeighbour(NeighbourId n) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n); - synchronized(neighbourLock) { - Txn txn = db.startTransaction(); - try { - db.addNeighbour(txn, n); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + synchronized(contactLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + db.addNeighbour(txn, n); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } } } } public void addLocallyGeneratedMessage(Message m) throws DbException { waitForPermissionToWrite(); - synchronized(messageLock) { - synchronized(neighbourLock) { - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - if(db.containsSubscription(txn, m.getGroup())) { - boolean added = storeMessage(txn, m, null); - assert added; - } else { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Not subscribed"); + synchronized(contactLock) { + synchronized(messageLock) { + synchronized(messageStatusLock) { + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + if(db.containsSubscription(txn, m.getGroup())) { + boolean added = storeMessage(txn, m, null); + assert added; + } else { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Not subscribed"); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; } } } @@ -173,16 +182,18 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void unsubscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Unsubscribing from " + g); - synchronized(messageLock) { - synchronized(neighbourLock) { - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - db.removeSubscription(txn, g); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + synchronized(contactLock) { + synchronized(messageLock) { + synchronized(messageStatusLock) { + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + db.removeSubscription(txn, g); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } } } } @@ -192,37 +203,43 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void generateBundle(NeighbourId n, Bundle b) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n); // Ack all batches received from the neighbour - synchronized(neighbourLock) { - Txn txn = db.startTransaction(); - try { - int numAcks = 0; - for(BatchId ack : db.removeBatchesToAck(txn, n)) { - b.addAck(ack); - numAcks++; + synchronized(contactLock) { + if(!containsNeighbour(n)) return; + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + int numAcks = 0; + for(BatchId ack : db.removeBatchesToAck(txn, n)) { + b.addAck(ack); + numAcks++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numAcks + " acks"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numAcks + " acks"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; } } // Add a list of subscriptions - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - int numSubs = 0; - for(GroupId g : db.getSubscriptions(txn)) { - b.addSubscription(g); - numSubs++; + synchronized(contactLock) { + if(!containsNeighbour(n)) return; + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + int numSubs = 0; + for(GroupId g : db.getSubscriptions(txn)) { + b.addSubscription(g); + numSubs++; + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + numSubs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + numSubs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; } } // Add as many messages as possible to the bundle @@ -243,30 +260,49 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } private Batch fillBatch(NeighbourId n, long capacity) throws DbException { - synchronized(messageLock) { - synchronized(neighbourLock) { - Txn txn = db.startTransaction(); - try { - capacity = Math.min(capacity, Batch.CAPACITY); - Iterator<MessageId> it = - db.getSendableMessages(txn, n, capacity).iterator(); - if(!it.hasNext()) { + synchronized(contactLock) { + if(!containsNeighbour(n)) return null; + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + capacity = Math.min(capacity, Batch.CAPACITY); + Iterator<MessageId> it = + db.getSendableMessages(txn, n, capacity).iterator(); + if(!it.hasNext()) { + db.commitTransaction(txn); + return null; // No more messages to send + } + Batch b = batchProvider.get(); + Set<MessageId> sent = new HashSet<MessageId>(); + while(it.hasNext()) { + MessageId m = it.next(); + b.addMessage(db.getMessage(txn, m)); + sent.add(m); + } + b.seal(); + // Record the contents of the batch + assert !sent.isEmpty(); + db.addOutstandingBatch(txn, n, b.getId(), sent); db.commitTransaction(txn); - return null; // No more messages to send - } - Batch b = batchProvider.get(); - Set<MessageId> sent = new HashSet<MessageId>(); - while(it.hasNext()) { - MessageId m = it.next(); - b.addMessage(db.getMessage(txn, m)); - sent.add(m); + return b; + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - b.seal(); - // Record the contents of the batch - assert !sent.isEmpty(); - db.addOutstandingBatch(txn, n, b.getId(), sent); + } + } + } + } + + public void removeNeighbour(NeighbourId n) throws DbException { + if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing neighbour " + n); + synchronized(contactLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + db.removeNeighbour(txn, n); db.commitTransaction(txn); - return b; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -278,42 +314,48 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void receiveBundle(NeighbourId n, Bundle b) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + n + ", " - + b.getSize() + " bytes"); + + b.getSize() + " bytes"); // Mark all messages in acked batches as seen - synchronized(messageLock) { - synchronized(neighbourLock) { - int acks = 0; - for(BatchId ack : b.getAcks()) { - acks++; - Txn txn = db.startTransaction(); - try { - db.removeAckedBatch(txn, n, ack); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + synchronized(contactLock) { + if(!containsNeighbour(n)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + int acks = 0; + for(BatchId ack : b.getAcks()) { + acks++; + Txn txn = db.startTransaction(); + try { + db.removeAckedBatch(txn, n, ack); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + acks + " acks"); } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + acks + " acks"); } } // Update the neighbour's subscriptions - synchronized(neighbourLock) { - Txn txn = db.startTransaction(); - try { - db.clearSubscriptions(txn, n); - int subs = 0; - for(GroupId g : b.getSubscriptions()) { - subs++; - db.addSubscription(txn, n, g); + synchronized(contactLock) { + if(!containsNeighbour(n)) return; + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + db.clearSubscriptions(txn, n); + int subs = 0; + for(GroupId g : b.getSubscriptions()) { + subs++; + db.addSubscription(txn, n, g); + } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + subs + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + subs + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; } } // Store the messages @@ -321,26 +363,30 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { for(Batch batch : b.getBatches()) { batches++; waitForPermissionToWrite(); - synchronized(messageLock) { - synchronized(neighbourLock) { - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - int received = 0, stored = 0; - for(Message m : batch.getMessages()) { - received++; - if(db.containsSubscription(txn, m.getGroup())) { - if(storeMessage(txn, m, n)) stored++; + synchronized(contactLock) { + if(!containsNeighbour(n)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + int received = 0, stored = 0; + for(Message m : batch.getMessages()) { + received++; + GroupId g = m.getGroup(); + if(db.containsSubscription(txn, g)) { + if(storeMessage(txn, m, n)) stored++; + } } + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + received + + " messages, stored " + stored); + db.addBatchToAck(txn, n, batch.getId()); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + received - + " messages, stored " + stored); - db.addBatchToAck(txn, n, batch.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; } } } @@ -350,26 +396,13 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { LOG.fine("Received " + batches + " batches"); // Find any lost batches that need to be retransmitted Set<BatchId> lost; - synchronized(messageLock) { - synchronized(neighbourLock) { - Txn txn = db.startTransaction(); - try { - lost = db.addReceivedBundle(txn, n, b.getId()); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - for(BatchId batch : lost) { + synchronized(contactLock) { + if(!containsNeighbour(n)) return; synchronized(messageLock) { - synchronized(neighbourLock) { + synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Removing lost batch"); - db.removeLostBatch(txn, n, batch); + lost = db.addReceivedBundle(txn, n, b.getId()); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -378,6 +411,25 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } } + for(BatchId batch : lost) { + synchronized(contactLock) { + if(!containsNeighbour(n)) return; + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Removing lost batch"); + db.removeLostBatch(txn, n, batch); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } System.gc(); } }