diff --git a/api/net/sf/briar/api/db/DatabaseComponent.java b/api/net/sf/briar/api/db/DatabaseComponent.java index 9c12e40e5ddd1719e54fa6c4db1771942390dd09..01f449af9cb35822b7303c714f739528a5de9bbd 100644 --- a/api/net/sf/briar/api/db/DatabaseComponent.java +++ b/api/net/sf/briar/api/db/DatabaseComponent.java @@ -26,15 +26,15 @@ public interface DatabaseComponent { static final int CLEANER_SLEEP_MS = 1000; // 1 sec static final int RETRANSMIT_THRESHOLD = 3; + /** Waits for any open transactions to finish and closes the database. */ + void close() throws DbException; + /** Adds a locally generated message to the database. */ void addLocallyGeneratedMessage(Message m) throws DbException; /** Adds a new neighbour to the database. */ void addNeighbour(NeighbourId n) throws DbException; - /** Waits for any open transactions to finish and closes the database. */ - void close() throws DbException; - /** Generates a bundle of messages for the given neighbour. */ void generateBundle(NeighbourId n, Bundle b) throws DbException; diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index a50518218fa6426f45d98e90da269a538b9533e5..6d6f1bad44a1b19ea1e479e72181a272261b7277 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -33,7 +33,7 @@ interface Database<T> { void close() throws DbException; /** Starts a new transaction and returns an object representing it. */ - T startTransaction(String name) throws DbException; + T startTransaction() throws DbException; /** * Aborts the given transaction - no changes made during the transaction @@ -183,6 +183,14 @@ interface Database<T> { */ Set<GroupId> getSubscriptions(T txn) throws DbException; + /** + * Removes an outstanding batch that has been acknowledged. Any messages in + * the batch that are still considered outstanding (Status.SENT) with + * respect to the given neighbour are now considered seen (Status.SEEN). + * Locking: neighbours write, messages read. + */ + void removeAckedBatch(T txn, NeighbourId n, BatchId b) throws DbException; + /** * Removes and returns the IDs of any batches received from the given * neighbour that need to be acknowledged. @@ -191,10 +199,9 @@ interface Database<T> { Set<BatchId> removeBatchesToAck(T txn, NeighbourId n) throws DbException; /** - * Removes an outstanding batch that is considered to have been lost. Any - * messages in the batch that are still considered outstanding - * (Status.SENT) with respect to the given neighbour are now considered - * unsent (Status.NEW). + * Removes an outstanding batch that has been lost. Any messages in the + * batch that are still considered outstanding (Status.SENT) with respect + * to the given neighbour are now considered unsent (Status.NEW). * Locking: neighbours write, messages read. */ void removeLostBatch(T txn, NeighbourId n, BatchId b) throws DbException; @@ -205,13 +212,6 @@ interface Database<T> { */ void removeMessage(T txn, MessageId m) throws DbException; - /** - * Removes an outstanding batch that has been acknowledged. The status of - * the acknowledged messages is not updated. - * Locking: neighbours write. - */ - Set<MessageId> removeOutstandingBatch(T txn, NeighbourId n, BatchId b) throws DbException; - /** * Unsubscribes from the given group. Any messages belonging to the group * are deleted from the database. diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index 3fbad792cc6b325acc7b950e486e6c16d2c8b6d0..e17b60389ad00ba6d92ca0ba066ad0dbf0a66537 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -177,7 +177,7 @@ abstract class JdbcDatabase implements Database<Connection> { } catch(ClassNotFoundException e) { throw new DbException(e); } - Connection txn = startTransaction("initialize"); + Connection txn = startTransaction(); try { // If not resuming, create the tables if(resume) { @@ -264,7 +264,7 @@ abstract class JdbcDatabase implements Database<Connection> { } catch(SQLException ignored) {} } - public Connection startTransaction(String name) throws DbException { + public Connection startTransaction() throws DbException { Connection txn = null; try { synchronized(connections) { @@ -329,7 +329,7 @@ abstract class JdbcDatabase implements Database<Connection> { while(openConnections > 0) { if(LOG.isLoggable(Level.FINE)) LOG.fine("Waiting for " + openConnections - + " open connections"); + + " open connections"); try { connections.wait(); } catch(InterruptedException ignored) {} @@ -937,7 +937,7 @@ abstract class JdbcDatabase implements Database<Connection> { if(!ids.isEmpty()) { if(LOG.isLoggable(Level.FINE)) LOG.fine(ids.size() + " sendable messages, " + total - + " bytes"); + + " bytes"); } return ids; } catch(SQLException e) { @@ -968,37 +968,13 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<BatchId> removeBatchesToAck(Connection txn, NeighbourId n) + public void removeAckedBatch(Connection txn, NeighbourId n, BatchId b) throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT batchId FROM batchesToAck" - + " WHERE neighbourId = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); - rs = ps.executeQuery(); - Set<BatchId> ids = new HashSet<BatchId>(); - while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); - rs.close(); - ps.close(); - sql = "DELETE FROM batchesToAck WHERE neighbourId = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); - int rowsAffected = ps.executeUpdate(); - assert rowsAffected == ids.size(); - ps.close(); - return ids; - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - tryToClose(txn); - throw new DbException(e); - } + removeBatch(txn, n, b, Status.SEEN); } - public void removeLostBatch(Connection txn, NeighbourId n, BatchId b) - throws DbException { + private void removeBatch(Connection txn, NeighbourId n, BatchId b, + Status newStatus) throws DbException { PreparedStatement ps = null, ps1 = null; ResultSet rs = null; try { @@ -1011,7 +987,7 @@ abstract class JdbcDatabase implements Database<Connection> { sql = "UPDATE statuses SET status = ?" + " WHERE messageId = ? AND neighbourId = ? AND status = ?"; ps1 = txn.prepareStatement(sql); - ps1.setShort(1, (short) Status.NEW.ordinal()); + ps1.setShort(1, (short) newStatus.ordinal()); ps1.setInt(3, n.getInt()); ps1.setShort(4, (short) Status.SENT.ordinal()); int messages = 0; @@ -1028,6 +1004,7 @@ abstract class JdbcDatabase implements Database<Connection> { assert rowsAffected[i] <= 1; } ps1.close(); + // Cascade on delete sql = "DELETE FROM outstandingBatches WHERE batchId = ?"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); @@ -1043,46 +1020,50 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void removeMessage(Connection txn, MessageId m) throws DbException { + public Set<BatchId> removeBatchesToAck(Connection txn, NeighbourId n) + throws DbException { PreparedStatement ps = null; + ResultSet rs = null; try { - String sql = "DELETE FROM messages WHERE messageId = ?"; + String sql = "SELECT batchId FROM batchesToAck" + + " WHERE neighbourId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, m.getBytes()); + ps.setInt(1, n.getInt()); + rs = ps.executeQuery(); + Set<BatchId> ids = new HashSet<BatchId>(); + while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); + rs.close(); + ps.close(); + sql = "DELETE FROM batchesToAck WHERE neighbourId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, n.getInt()); int rowsAffected = ps.executeUpdate(); - assert rowsAffected == 1; + assert rowsAffected == ids.size(); ps.close(); + return ids; } catch(SQLException e) { + tryToClose(rs); tryToClose(ps); tryToClose(txn); throw new DbException(e); } } - public Set<MessageId> removeOutstandingBatch(Connection txn, NeighbourId n, - BatchId b) throws DbException { + public void removeLostBatch(Connection txn, NeighbourId n, BatchId b) + throws DbException { + removeBatch(txn, n, b, Status.NEW); + } + + public void removeMessage(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; - ResultSet rs = null; try { - String sql = "SELECT messageId FROM outstandingMessages" - + " WHERE neighbourId = ? AND batchId = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, n.getInt()); - ps.setBytes(2, b.getBytes()); - rs = ps.executeQuery(); - Set<MessageId> messages = new HashSet<MessageId>(); - while(rs.next()) messages.add(new MessageId(rs.getBytes(1))); - rs.close(); - ps.close(); - sql = "DELETE FROM outstandingBatches WHERE batchId = ?"; + String sql = "DELETE FROM messages WHERE messageId = ?"; ps = txn.prepareStatement(sql); - ps.setBytes(1, b.getBytes()); + ps.setBytes(1, m.getBytes()); int rowsAffected = ps.executeUpdate(); - assert rowsAffected <= 1; + assert rowsAffected == 1; ps.close(); - return messages.isEmpty() ? null : messages; } catch(SQLException e) { - tryToClose(rs); tryToClose(ps); tryToClose(txn); throw new DbException(e); diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index 51d6244969b8cd46d282df09a6cc9484e4f914b6..4e1680b539ed8fd35a5ed32312277d75cc8ee94a 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -10,7 +10,6 @@ import java.util.logging.Logger; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NeighbourId; import net.sf.briar.api.db.Rating; -import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; @@ -53,7 +52,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("cleaner"); + Txn txn = db.startTransaction(); try { for(MessageId m : db.getOldMessages(txn, size)) { removeMessage(txn, m); @@ -99,7 +98,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n); neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("addNeighbour"); + Txn txn = db.startTransaction(); try { db.addNeighbour(txn, n); db.commitTransaction(txn); @@ -120,7 +119,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { subscriptionLock.readLock().lock(); try { - Txn txn = db.startTransaction("addLocallyGeneratedMessage"); + Txn txn = db.startTransaction(); try { if(db.containsSubscription(txn, m.getGroup())) { boolean added = storeMessage(txn, m, null); @@ -148,7 +147,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public Rating getRating(AuthorId a) throws DbException { ratingLock.readLock().lock(); try { - Txn txn = db.startTransaction("getRating"); + Txn txn = db.startTransaction(); try { Rating r = db.getRating(txn, a); db.commitTransaction(txn); @@ -167,7 +166,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { ratingLock.writeLock().lock(); try { - Txn txn = db.startTransaction("setRating"); + Txn txn = db.startTransaction(); try { Rating old = db.setRating(txn, a, r); // Update the sendability of the author's messages @@ -191,7 +190,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public Set<GroupId> getSubscriptions() throws DbException { subscriptionLock.readLock().lock(); try { - Txn txn = db.startTransaction("getSubscriptions"); + Txn txn = db.startTransaction(); try { HashSet<GroupId> subs = new HashSet<GroupId>(); for(GroupId g : db.getSubscriptions(txn)) subs.add(g); @@ -210,7 +209,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); subscriptionLock.writeLock().lock(); try { - Txn txn = db.startTransaction("subscribe"); + Txn txn = db.startTransaction(); try { db.addSubscription(txn, g); db.commitTransaction(txn); @@ -231,7 +230,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { subscriptionLock.writeLock().lock(); try { - Txn txn = db.startTransaction("unsubscribe"); + Txn txn = db.startTransaction(); try { db.removeSubscription(txn, g); db.commitTransaction(txn); @@ -255,7 +254,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { // Ack all batches received from the neighbour neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("generateBundle:acks"); + Txn txn = db.startTransaction(); try { int numAcks = 0; for(BatchId ack : db.removeBatchesToAck(txn, n)) { @@ -275,7 +274,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { // Add a list of subscriptions subscriptionLock.readLock().lock(); try { - Txn txn = db.startTransaction("generateBundle:subscriptions"); + Txn txn = db.startTransaction(); try { int numSubs = 0; for(GroupId g : db.getSubscriptions(txn)) { @@ -316,7 +315,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { Batch b; neighbourLock.readLock().lock(); try { - Txn txn = db.startTransaction("fillBatch:read"); + Txn txn = db.startTransaction(); try { capacity = Math.min(capacity, Batch.CAPACITY); Iterator<MessageId> it = @@ -344,7 +343,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { // Record the contents of the batch neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("fillBatch:write"); + Txn txn = db.startTransaction(); try { assert !sent.isEmpty(); db.addOutstandingBatch(txn, n, b.getId(), sent); @@ -365,29 +364,18 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void receiveBundle(NeighbourId n, Bundle b) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + n + ", " - + b.getSize() + " bytes"); + + b.getSize() + " bytes"); // Mark all messages in acked batches as seen messageLock.readLock().lock(); try { neighbourLock.writeLock().lock(); try { - int acks = 0, expired = 0; + int acks = 0; for(BatchId ack : b.getAcks()) { acks++; - Txn txn = db.startTransaction("receiveBundle:acks"); + Txn txn = db.startTransaction(); try { - Iterable<MessageId> batch = - db.removeOutstandingBatch(txn, n, ack); - // May be null if the batch was empty or has expired - if(batch == null) { - expired++; - } else { - for(MessageId m : batch) { - // Don't re-create statuses for expired messages - if(db.containsMessage(txn, m)) - db.setStatus(txn, n, m, Status.SEEN); - } - } + db.removeAckedBatch(txn, n, ack); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -395,8 +383,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + acks + " acks, " + expired - + " expired"); + LOG.fine("Received " + acks + " acks"); } finally { neighbourLock.writeLock().unlock(); } @@ -406,7 +393,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { // Update the neighbour's subscriptions neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("receiveBundle:subscriptions"); + Txn txn = db.startTransaction(); try { db.clearSubscriptions(txn, n); int subs = 0; @@ -435,7 +422,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { subscriptionLock.readLock().lock(); try { - Txn txn = db.startTransaction("receiveBundle:batch"); + Txn txn = db.startTransaction(); try { int received = 0, stored = 0; for(Message m : batch.getMessages()) { @@ -446,7 +433,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + received - + " messages, stored " + stored); + + " messages, stored " + stored); db.addBatchToAck(txn, n, batch.getId()); db.commitTransaction(txn); } catch(DbException e) { @@ -471,7 +458,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("receiveBundle:findLost"); + Txn txn = db.startTransaction(); try { lost = db.addReceivedBundle(txn, n, b.getId()); db.commitTransaction(txn); @@ -490,7 +477,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { neighbourLock.writeLock().lock(); try { - Txn txn = db.startTransaction("receiveBundle:removeLost"); + Txn txn = db.startTransaction(); try { if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing lost batch"); diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index 9dff747804bfc6372a9906cfb73a6b78fa06a614..90cd27beea2cbcac887133d762d1a3aaf207bf89 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -9,7 +9,6 @@ import java.util.logging.Logger; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NeighbourId; import net.sf.briar.api.db.Rating; -import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; @@ -45,7 +44,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { protected void expireMessages(long size) throws DbException { synchronized(messageLock) { synchronized(neighbourLock) { - Txn txn = db.startTransaction("cleaner"); + Txn txn = db.startTransaction(); try { for(MessageId m : db.getOldMessages(txn, size)) { removeMessage(txn, m); @@ -74,7 +73,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void addNeighbour(NeighbourId n) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Adding neighbour " + n); synchronized(neighbourLock) { - Txn txn = db.startTransaction("addNeighbour"); + Txn txn = db.startTransaction(); try { db.addNeighbour(txn, n); db.commitTransaction(txn); @@ -90,7 +89,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { synchronized(messageLock) { synchronized(neighbourLock) { synchronized(subscriptionLock) { - Txn txn = db.startTransaction("addLocallyGeneratedMessage"); + Txn txn = db.startTransaction(); try { if(db.containsSubscription(txn, m.getGroup())) { boolean added = storeMessage(txn, m, null); @@ -111,7 +110,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public Rating getRating(AuthorId a) throws DbException { synchronized(ratingLock) { - Txn txn = db.startTransaction("getRating"); + Txn txn = db.startTransaction(); try { Rating r = db.getRating(txn, a); db.commitTransaction(txn); @@ -126,7 +125,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void setRating(AuthorId a, Rating r) throws DbException { synchronized(messageLock) { synchronized(ratingLock) { - Txn txn = db.startTransaction("setRating"); + Txn txn = db.startTransaction(); try { Rating old = db.setRating(txn, a, r); // Update the sendability of the author's messages @@ -145,7 +144,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public Set<GroupId> getSubscriptions() throws DbException { synchronized(subscriptionLock) { - Txn txn = db.startTransaction("getSubscriptions"); + Txn txn = db.startTransaction(); try { HashSet<GroupId> subs = new HashSet<GroupId>(); for(GroupId g : db.getSubscriptions(txn)) subs.add(g); @@ -161,7 +160,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { public void subscribe(GroupId g) throws DbException { if(LOG.isLoggable(Level.FINE)) LOG.fine("Subscribing to " + g); synchronized(subscriptionLock) { - Txn txn = db.startTransaction("subscribe"); + Txn txn = db.startTransaction(); try { db.addSubscription(txn, g); db.commitTransaction(txn); @@ -177,7 +176,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { synchronized(messageLock) { synchronized(neighbourLock) { synchronized(subscriptionLock) { - Txn txn = db.startTransaction("unsubscribe"); + Txn txn = db.startTransaction(); try { db.removeSubscription(txn, g); db.commitTransaction(txn); @@ -194,7 +193,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + n); // Ack all batches received from the neighbour synchronized(neighbourLock) { - Txn txn = db.startTransaction("generateBundle:acks"); + Txn txn = db.startTransaction(); try { int numAcks = 0; for(BatchId ack : db.removeBatchesToAck(txn, n)) { @@ -211,7 +210,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } // Add a list of subscriptions synchronized(subscriptionLock) { - Txn txn = db.startTransaction("generateBundle:subscriptions"); + Txn txn = db.startTransaction(); try { int numSubs = 0; for(GroupId g : db.getSubscriptions(txn)) { @@ -246,7 +245,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { private Batch fillBatch(NeighbourId n, long capacity) throws DbException { synchronized(messageLock) { synchronized(neighbourLock) { - Txn txn = db.startTransaction("fillBatch"); + Txn txn = db.startTransaction(); try { capacity = Math.min(capacity, Batch.CAPACITY); Iterator<MessageId> it = @@ -283,23 +282,12 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { // Mark all messages in acked batches as seen synchronized(messageLock) { synchronized(neighbourLock) { - int acks = 0, expired = 0; + int acks = 0; for(BatchId ack : b.getAcks()) { acks++; - Txn txn = db.startTransaction("receiveBundle:acks"); + Txn txn = db.startTransaction(); try { - Iterable<MessageId> batch = - db.removeOutstandingBatch(txn, n, ack); - // May be null if the batch was empty or has expired - if(batch == null) { - expired++; - } else { - for(MessageId m : batch) { - // Don't re-create statuses for expired messages - if(db.containsMessage(txn, m)) - db.setStatus(txn, n, m, Status.SEEN); - } - } + db.removeAckedBatch(txn, n, ack); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -307,13 +295,12 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + acks + " acks, " + expired - + " expired"); + LOG.fine("Received " + acks + " acks"); } } // Update the neighbour's subscriptions synchronized(neighbourLock) { - Txn txn = db.startTransaction("receiveBundle:subscriptions"); + Txn txn = db.startTransaction(); try { db.clearSubscriptions(txn, n); int subs = 0; @@ -337,7 +324,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { synchronized(messageLock) { synchronized(neighbourLock) { synchronized(subscriptionLock) { - Txn txn = db.startTransaction("receiveBundle:batch"); + Txn txn = db.startTransaction(); try { int received = 0, stored = 0; for(Message m : batch.getMessages()) { @@ -365,7 +352,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { Set<BatchId> lost; synchronized(messageLock) { synchronized(neighbourLock) { - Txn txn = db.startTransaction("receiveBundle:findLost"); + Txn txn = db.startTransaction(); try { lost = db.addReceivedBundle(txn, n, b.getId()); db.commitTransaction(txn); @@ -378,7 +365,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { for(BatchId batch : lost) { synchronized(messageLock) { synchronized(neighbourLock) { - Txn txn = db.startTransaction("receiveBundle:removeLost"); + Txn txn = db.startTransaction(); try { if(LOG.isLoggable(Level.FINE)) LOG.fine("Removing lost batch");