diff --git a/api/net/sf/briar/api/protocol/BundleId.java b/api/net/sf/briar/api/protocol/BundleId.java deleted file mode 100644 index dfc9f285a5f326c854a7e2a50448d019f7619cf6..0000000000000000000000000000000000000000 --- a/api/net/sf/briar/api/protocol/BundleId.java +++ /dev/null @@ -1,26 +0,0 @@ -package net.sf.briar.api.protocol; - -import java.util.Arrays; - -/** - * Type-safe wrapper for a byte array that uniquely identifies a bundle of - * acknowledgements, subscriptions, and batches of messages. - */ -public class BundleId extends UniqueId { - - public static final BundleId NONE = new BundleId(new byte[] { - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - }); - - public BundleId(byte[] id) { - super(id); - } - - @Override - public boolean equals(Object o) { - if(o instanceof BundleId) - return Arrays.equals(id, ((BundleId) o).id); - return false; - } -} diff --git a/api/net/sf/briar/api/protocol/BundleWriter.java b/api/net/sf/briar/api/protocol/BundleWriter.java index 5bb570a3ca84ac34c549990ab0cd8a1063947c0f..ca841885d82b0c6a158d7cfeced0c65f60044d7c 100644 --- a/api/net/sf/briar/api/protocol/BundleWriter.java +++ b/api/net/sf/briar/api/protocol/BundleWriter.java @@ -13,8 +13,8 @@ public interface BundleWriter { /** Returns the bundle's remaining capacity in bytes. */ long getRemainingCapacity() throws IOException; - /** Adds a header to the bundle and returns its identifier. */ - BundleId addHeader(Iterable<BatchId> acks, Iterable<GroupId> subs, + /** Adds a header to the bundle. */ + void addHeader(Iterable<BatchId> acks, Iterable<GroupId> subs, Map<String, String> transports) throws IOException, GeneralSecurityException; diff --git a/api/net/sf/briar/api/protocol/Header.java b/api/net/sf/briar/api/protocol/Header.java index 09d762989b41abe25dc71df08127e2caa1fd3aff..3d360175307ca6962fed8a6983fe6ef80af5e78d 100644 --- a/api/net/sf/briar/api/protocol/Header.java +++ b/api/net/sf/briar/api/protocol/Header.java @@ -8,9 +8,6 @@ public interface Header { static final int MAX_SIZE = 1024 * 1024; - // FIXME: Remove BundleId when refactoring is complete - BundleId getId(); - /** Returns the acknowledgements contained in the header. */ Set<BatchId> getAcks(); diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index ff50293bdbfbce50ac831ddff41ee56706e5e3ca..7d07e6775d9b087b44465917a32dfefc0e308fcf 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -9,7 +9,6 @@ import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; @@ -35,14 +34,10 @@ import net.sf.briar.api.protocol.MessageId; interface Database<T> { /** - * A batch sent to a contact is considered lost when this many bundles have - * been received from the contact since the batch was sent. - * <p> - * FIXME: Come up with a better retransmission scheme. This scheme doesn't - * cope well with transports that have high latency but send bundles - * frequently. + * A batch sent to a contact is considered lost when this many more + * recently sent batches have been acknowledged. */ - static final int RETRANSMIT_THRESHOLD = 3; + static final int RETRANSMIT_THRESHOLD = 5; /** * Opens the database. @@ -103,15 +98,6 @@ interface Database<T> { */ void addOutstandingBatch(T txn, ContactId c, BatchId b, Set<MessageId> sent) throws DbException; - /** - * Records a received bundle. This should be called after processing the - * bundle's contents, and may result in outstanding messages becoming - * eligible for retransmission. - * <p> - * Locking: contacts read, messages read, messageStatuses write. - */ - Set<BatchId> addReceivedBundle(T txn, ContactId c, BundleId b) throws DbException; - /** * Subscribes to the given group. * <p> @@ -167,6 +153,14 @@ interface Database<T> { */ GroupId getGroup(T txn, MessageId m) throws DbException; + /** + * Returns the IDs of any batches sent to the given contact that should now + * be considered lost. + * <p> + * Locking: contacts read, messages read, messageStatuses read. + */ + Set<BatchId> getLostBatches(T txn, ContactId c) throws DbException; + /** * Returns the message identified by the given ID. * <p> diff --git a/components/net/sf/briar/db/H2Database.java b/components/net/sf/briar/db/H2Database.java index 9c6908a7dba8fb96791c0bb40d1082f459b47888..6737fb5b8c7893dd9eb98b8d52ae19813b344ab8 100644 --- a/components/net/sf/briar/db/H2Database.java +++ b/components/net/sf/briar/db/H2Database.java @@ -30,7 +30,7 @@ class H2Database extends JdbcDatabase { @Inject H2Database(File dir, MessageFactory messageFactory, @DatabasePassword Password password, long maxSize) { - super(messageFactory, "BINARY(32)"); + super(messageFactory, "BINARY(32)", "BIGINT"); home = new File(dir, "db"); this.password = password; url = "jdbc:h2:split:" + home.getPath() diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index cabebcc17543d2a815f53106729d251fb8dd3caf..7a973296c96f937d57122d6829f27b3be4312e12 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -9,7 +9,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; @@ -26,7 +25,6 @@ import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageFactory; @@ -73,7 +71,6 @@ abstract class JdbcDatabase implements Database<Connection> { private static final String CREATE_CONTACTS = "CREATE TABLE contacts" + " (contactId INT NOT NULL," - + " lastBundleReceived XXXX NOT NULL," + " PRIMARY KEY (contactId))"; private static final String CREATE_BATCHES_TO_ACK = @@ -96,7 +93,8 @@ abstract class JdbcDatabase implements Database<Connection> { "CREATE TABLE outstandingBatches" + " (batchId XXXX NOT NULL," + " contactId INT NOT NULL," - + " lastBundleReceived XXXX NOT NULL," + + " timestamp YYYY NOT NULL," + + " passover INT NOT NULL," + " PRIMARY KEY (batchId)," + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" + " ON DELETE CASCADE)"; @@ -124,15 +122,6 @@ abstract class JdbcDatabase implements Database<Connection> { + " rating SMALLINT NOT NULL," + " PRIMARY KEY (authorId))"; - private static final String CREATE_RECEIVED_BUNDLES = - "CREATE TABLE receivedBundles" - + " (bundleId XXXX NOT NULL," - + " contactId INT NOT NULL," - + " timestamp BIGINT NOT NULL," - + " PRIMARY KEY (bundleId, contactId)," - + " FOREIGN KEY (contactId) REFERENCES contacts (contactId)" - + " ON DELETE CASCADE)"; - private static final String CREATE_STATUSES = "CREATE TABLE statuses" + " (messageId XXXX NOT NULL," @@ -169,7 +158,8 @@ abstract class JdbcDatabase implements Database<Connection> { Logger.getLogger(JdbcDatabase.class.getName()); private final MessageFactory messageFactory; - private final String hashType; + // Different database libraries use different names for certain types + private final String hashType, bigIntType; private final LinkedList<Connection> connections = new LinkedList<Connection>(); // Locking: self @@ -178,9 +168,11 @@ abstract class JdbcDatabase implements Database<Connection> { protected abstract Connection createConnection() throws SQLException; - JdbcDatabase(MessageFactory messageFactory, String hashType) { + JdbcDatabase(MessageFactory messageFactory, String hashType, + String bigIntType) { this.messageFactory = messageFactory; this.hashType = hashType; + this.bigIntType = bigIntType; } protected void open(boolean resume, File dir, String driverClass) @@ -219,47 +211,44 @@ abstract class JdbcDatabase implements Database<Connection> { s = txn.createStatement(); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating localSubscriptions table"); - s.executeUpdate(insertHashType(CREATE_LOCAL_SUBSCRIPTIONS)); + s.executeUpdate(insertTypeNames(CREATE_LOCAL_SUBSCRIPTIONS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating messages table"); - s.executeUpdate(insertHashType(CREATE_MESSAGES)); + s.executeUpdate(insertTypeNames(CREATE_MESSAGES)); s.executeUpdate(INDEX_MESSAGES_BY_PARENT); s.executeUpdate(INDEX_MESSAGES_BY_AUTHOR); s.executeUpdate(INDEX_MESSAGES_BY_TIMESTAMP); s.executeUpdate(INDEX_MESSAGES_BY_SENDABILITY); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating contacts table"); - s.executeUpdate(insertHashType(CREATE_CONTACTS)); + s.executeUpdate(insertTypeNames(CREATE_CONTACTS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating batchesToAck table"); - s.executeUpdate(insertHashType(CREATE_BATCHES_TO_ACK)); + s.executeUpdate(insertTypeNames(CREATE_BATCHES_TO_ACK)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating contactSubscriptions table"); - s.executeUpdate(insertHashType(CREATE_CONTACT_SUBSCRIPTIONS)); + s.executeUpdate(insertTypeNames(CREATE_CONTACT_SUBSCRIPTIONS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating outstandingBatches table"); - s.executeUpdate(insertHashType(CREATE_OUTSTANDING_BATCHES)); + s.executeUpdate(insertTypeNames(CREATE_OUTSTANDING_BATCHES)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating outstandingMessages table"); - s.executeUpdate(insertHashType(CREATE_OUTSTANDING_MESSAGES)); + s.executeUpdate(insertTypeNames(CREATE_OUTSTANDING_MESSAGES)); s.executeUpdate(INDEX_OUTSTANDING_MESSAGES_BY_BATCH); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating ratings table"); - s.executeUpdate(insertHashType(CREATE_RATINGS)); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Creating receivedBundles table"); - s.executeUpdate(insertHashType(CREATE_RECEIVED_BUNDLES)); + s.executeUpdate(insertTypeNames(CREATE_RATINGS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating statuses table"); - s.executeUpdate(insertHashType(CREATE_STATUSES)); + s.executeUpdate(insertTypeNames(CREATE_STATUSES)); s.executeUpdate(INDEX_STATUSES_BY_MESSAGE); s.executeUpdate(INDEX_STATUSES_BY_CONTACT); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating contact transports table"); - s.executeUpdate(insertHashType(CREATE_CONTACT_TRANSPORTS)); + s.executeUpdate(insertTypeNames(CREATE_CONTACT_TRANSPORTS)); if(LOG.isLoggable(Level.FINE)) LOG.fine("Creating local transports table"); - s.executeUpdate(insertHashType(CREATE_LOCAL_TRANSPORTS)); + s.executeUpdate(insertTypeNames(CREATE_LOCAL_TRANSPORTS)); s.close(); } catch(SQLException e) { tryToClose(s); @@ -268,9 +257,8 @@ abstract class JdbcDatabase implements Database<Connection> { } } - // FIXME: Get rid of this if we're definitely not using Derby - private String insertHashType(String s) { - return s.replaceAll("XXXX", hashType); + private String insertTypeNames(String s) { + return s.replaceAll("XXXX", hashType).replaceAll("YYYY", bigIntType); } private void tryToClose(Connection c) { @@ -371,8 +359,7 @@ abstract class JdbcDatabase implements Database<Connection> { throws DbException { PreparedStatement ps = null; try { - String sql = "INSERT INTO batchesToAck" - + " (batchId, contactId)" + String sql = "INSERT INTO batchesToAck (batchId, contactId)" + " VALUES (?, ?)"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); @@ -405,30 +392,15 @@ abstract class JdbcDatabase implements Database<Connection> { rs.close(); ps.close(); // Create a new contact row - sql = "INSERT INTO contacts" - + " (contactId, lastBundleReceived)" - + " VALUES (?, ?)"; + sql = "INSERT INTO contacts (contactId) VALUES (?)"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - ps.setBytes(2, BundleId.NONE.getBytes()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); - // Create a dummy received bundle row for BundleId.NONE - sql = "INSERT INTO receivedBundles" - + " (bundleId, contactId, timestamp)" - + " VALUES (?, ?, ?)"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, BundleId.NONE.getBytes()); - ps.setInt(2, c.getInt()); - ps.setLong(3, System.currentTimeMillis()); - rowsAffected = ps.executeUpdate(); - assert rowsAffected == 1; - ps.close(); // Store the contact's transport details if(transports != null) { - sql = "INSERT INTO contactTransports" - + " (contactId, key, value)" + sql = "INSERT INTO contactTransports (contactId, key, value)" + " VALUES (?, ?, ?)"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); @@ -485,27 +457,14 @@ abstract class JdbcDatabase implements Database<Connection> { PreparedStatement ps = null; ResultSet rs = null; try { - // Find the ID of the last bundle received from c - String sql = "SELECT lastBundleReceived FROM contacts" - + " WHERE contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - rs = ps.executeQuery(); - boolean found = rs.next(); - assert found; - byte[] lastBundleReceived = rs.getBytes(1); - boolean more = rs.next(); - assert !more; - rs.close(); - ps.close(); // Create an outstanding batch row - sql = "INSERT INTO outstandingBatches" - + " (batchId, contactId, lastBundleReceived)" - + " VALUES (?, ?, ?)"; + String sql = "INSERT INTO outstandingBatches" + + " (batchId, contactId, timestamp, passover)" + + " VALUES (?, ?, ?, ZERO())"; ps = txn.prepareStatement(sql); ps.setBytes(1, b.getBytes()); ps.setInt(2, c.getInt()); - ps.setBytes(3, lastBundleReceived); + ps.setLong(3, System.currentTimeMillis()); int rowsAffected = ps.executeUpdate(); assert rowsAffected == 1; ps.close(); @@ -551,98 +510,6 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<BatchId> addReceivedBundle(Connection txn, ContactId c, - BundleId b) throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - // Update the ID of the last bundle received from c - String sql = "UPDATE contacts SET lastBundleReceived = ?" - + " WHERE contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, b.getBytes()); - ps.setInt(2, c.getInt()); - int rowsAffected = ps.executeUpdate(); - assert rowsAffected == 1; - ps.close(); - // Count the received bundle records for c and find the oldest - sql = "SELECT bundleId, timestamp FROM receivedBundles" - + " WHERE contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - rs = ps.executeQuery(); - int received = 0; - long oldestTimestamp = Long.MAX_VALUE; - byte[] oldestBundle = null; - while(rs.next()) { - received++; - byte[] bundle = rs.getBytes(1); - long timestamp = rs.getLong(2); - if(timestamp < oldestTimestamp) { - oldestTimestamp = timestamp; - oldestBundle = bundle; - } - } - rs.close(); - ps.close(); - Set<BatchId> lost; - if(received == RETRANSMIT_THRESHOLD) { - // Expire batches related to the oldest received bundle - assert oldestBundle != null; - lost = findLostBatches(txn, c, oldestBundle); - sql = "DELETE FROM receivedBundles WHERE bundleId = ?"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, oldestBundle); - rowsAffected = ps.executeUpdate(); - assert rowsAffected == 1; - ps.close(); - } else { - lost = Collections.emptySet(); - } - // Record the new received bundle - sql = "INSERT INTO receivedBundles" - + " (bundleId, contactId, timestamp)" - + " VALUES (?, ?, ?)"; - ps = txn.prepareStatement(sql); - ps.setBytes(1, b.getBytes()); - ps.setInt(2, c.getInt()); - ps.setLong(3, System.currentTimeMillis()); - rowsAffected = ps.executeUpdate(); - assert rowsAffected == 1; - ps.close(); - return lost; - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - tryToClose(txn); - throw new DbException(e); - } - } - - private Set<BatchId> findLostBatches(Connection txn, ContactId c, - byte[] lastBundleReceived) throws DbException { - PreparedStatement ps = null; - ResultSet rs = null; - try { - String sql = "SELECT batchId FROM outstandingBatches" - + " WHERE contactId = ? AND lastBundleReceived = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - ps.setBytes(2, lastBundleReceived); - rs = ps.executeQuery(); - Set<BatchId> lost = new HashSet<BatchId>(); - while(rs.next()) lost.add(new BatchId(rs.getBytes(1))); - rs.close(); - ps.close(); - return lost; - } catch(SQLException e) { - tryToClose(rs); - tryToClose(ps); - tryToClose(txn); - throw new DbException(e); - } - } - public void addSubscription(Connection txn, GroupId g) throws DbException { PreparedStatement ps = null; try { @@ -829,6 +696,30 @@ abstract class JdbcDatabase implements Database<Connection> { } } + public Set<BatchId> getLostBatches(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT batchId FROM outstandingBatches" + + " WHERE contactId = ? AND passover >= ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setInt(2, RETRANSMIT_THRESHOLD); + rs = ps.executeQuery(); + Set<BatchId> ids = new HashSet<BatchId>(); + while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); + rs.close(); + ps.close(); + return ids; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + public Message getMessage(Connection txn, MessageId m) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -1158,6 +1049,38 @@ abstract class JdbcDatabase implements Database<Connection> { public void removeAckedBatch(Connection txn, ContactId c, BatchId b) throws DbException { + // Increment the passover count of all older outstanding batches + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT timestamp FROM outstandingBatches" + + " WHERE contactId = ? AND batchId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + ps.setBytes(2, b.getBytes()); + rs = ps.executeQuery(); + boolean found = rs.next(); + assert found; + long timestamp = rs.getLong(1); + boolean more = rs.next(); + assert !more; + rs.close(); + ps.close(); + sql = "UPDATE outstandingBatches SET passover = passover + ?" + + " WHERE contactId = ? AND timestamp < ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, 1); + ps.setInt(2, c.getInt()); + ps.setLong(3, timestamp); + int rowsAffected = ps.executeUpdate(); + assert rowsAffected >= 0; + ps.close(); + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } removeBatch(txn, c, b, Status.SEEN); } diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index 5342218efc426980158a65bc6a0627835b765ba0..7d6100d0b58161fea83398fc8d81843f5fd0f496 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -19,7 +19,6 @@ import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.BundleReader; import net.sf.briar.api.protocol.BundleWriter; import net.sf.briar.api.protocol.GroupId; @@ -525,7 +524,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + batches + " batches"); b.finish(); - retransmitLostBatches(c, h.getId()); + findLostBatches(c); System.gc(); } @@ -573,8 +572,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - private void retransmitLostBatches(ContactId c, BundleId b) - throws DbException { + private void findLostBatches(ContactId c) throws DbException { // Find any lost batches that need to be retransmitted Set<BatchId> lost; contactLock.readLock().lock(); @@ -586,7 +584,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { Txn txn = db.startTransaction(); try { - lost = db.addReceivedBundle(txn, c, b); + lost = db.getLostBatches(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index 9916738ea62fe2d99ed9028ae2cf05a760b70586..403dc153637e2a519ba9323b6cc0be10c3943af3 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -19,7 +19,6 @@ import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.BundleReader; import net.sf.briar.api.protocol.BundleWriter; import net.sf.briar.api.protocol.GroupId; @@ -396,7 +395,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { if(LOG.isLoggable(Level.FINE)) LOG.fine("Received " + batches + " batches"); b.finish(); - retransmitLostBatches(c, h.getId()); + findLostBatches(c); System.gc(); } @@ -432,7 +431,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - private void retransmitLostBatches(ContactId c, BundleId b) + private void findLostBatches(ContactId c) throws DbException { // Find any lost batches that need to be retransmitted Set<BatchId> lost; @@ -442,7 +441,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { - lost = db.addReceivedBundle(txn, c, b); + lost = db.getLostBatches(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); diff --git a/components/net/sf/briar/protocol/BundleReaderImpl.java b/components/net/sf/briar/protocol/BundleReaderImpl.java index 9fb756f4ea31720e7dfa6db9e1bd76dc67632781..cf99c965146b5a69a650f6b698268b415ad45afe 100644 --- a/components/net/sf/briar/protocol/BundleReaderImpl.java +++ b/components/net/sf/briar/protocol/BundleReaderImpl.java @@ -15,7 +15,6 @@ import java.util.Set; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.BundleReader; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Header; @@ -66,7 +65,6 @@ class BundleReaderImpl implements BundleReader { signature.initVerify(publicKey); messageDigest.reset(); // Read the signed data - in.setDigesting(true); in.setSigning(true); r.setReadLimit(Header.MAX_SIZE); Set<BatchId> acks = new HashSet<BatchId>(); @@ -86,11 +84,9 @@ class BundleReaderImpl implements BundleReader { in.setSigning(false); // Read and verify the signature byte[] sig = r.readRaw(); - in.setDigesting(false); if(!signature.verify(sig)) throw new SignatureException(); // Build and return the header - BundleId id = new BundleId(messageDigest.digest()); - return headerFactory.createHeader(id, acks, subs, transports); + return headerFactory.createHeader(acks, subs, transports); } public Batch getNextBatch() throws IOException, GeneralSecurityException { diff --git a/components/net/sf/briar/protocol/BundleWriterImpl.java b/components/net/sf/briar/protocol/BundleWriterImpl.java index 6dcdb65a15a2135e3d561441036f36faa723a956..9f9bfea8c4aaa4639e32b053cadcdcff8a30d56d 100644 --- a/components/net/sf/briar/protocol/BundleWriterImpl.java +++ b/components/net/sf/briar/protocol/BundleWriterImpl.java @@ -2,7 +2,6 @@ package net.sf.briar.protocol; import java.io.IOException; import java.io.OutputStream; -import java.security.DigestOutputStream; import java.security.GeneralSecurityException; import java.security.MessageDigest; import java.security.PrivateKey; @@ -10,7 +9,6 @@ import java.security.Signature; import java.util.Map; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.BundleWriter; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; @@ -22,7 +20,7 @@ class BundleWriterImpl implements BundleWriter { private static enum State { START, FIRST_BATCH, MORE_BATCHES, END }; - private final SigningOutputStream out; + private final SigningDigestingOutputStream out; private final Writer w; private final PrivateKey privateKey; private final Signature signature; @@ -33,8 +31,8 @@ class BundleWriterImpl implements BundleWriter { BundleWriterImpl(OutputStream out, WriterFactory writerFactory, PrivateKey privateKey, Signature signature, MessageDigest messageDigest, long capacity) { - OutputStream out1 = new DigestOutputStream(out, messageDigest); - this.out = new SigningOutputStream(out1, signature); + this.out = + new SigningDigestingOutputStream(out, signature, messageDigest); w = writerFactory.createWriter(this.out); this.privateKey = privateKey; this.signature = signature; @@ -46,13 +44,12 @@ class BundleWriterImpl implements BundleWriter { return capacity - w.getRawBytesWritten(); } - public BundleId addHeader(Iterable<BatchId> acks, Iterable<GroupId> subs, + public void addHeader(Iterable<BatchId> acks, Iterable<GroupId> subs, Map<String, String> transports) throws IOException, GeneralSecurityException { if(state != State.START) throw new IllegalStateException(); // Initialise the output stream signature.initSign(privateKey); - messageDigest.reset(); // Write the data to be signed out.setSigning(true); w.writeListStart(); @@ -66,9 +63,8 @@ class BundleWriterImpl implements BundleWriter { // Create and write the signature byte[] sig = signature.sign(); w.writeRaw(sig); - // Calculate and return the ID + // Expect a (possibly empty) list of batches state = State.FIRST_BATCH; - return new BundleId(messageDigest.digest()); } public BatchId addBatch(Iterable<Message> messages) throws IOException, @@ -82,6 +78,7 @@ class BundleWriterImpl implements BundleWriter { signature.initSign(privateKey); messageDigest.reset(); // Write the data to be signed + out.setDigesting(true); out.setSigning(true); w.writeListStart(); for(Message m : messages) w.writeRaw(m); @@ -90,6 +87,7 @@ class BundleWriterImpl implements BundleWriter { // Create and write the signature byte[] sig = signature.sign(); w.writeRaw(sig); + out.setDigesting(false); // Calculate and return the ID return new BatchId(messageDigest.digest()); } diff --git a/components/net/sf/briar/protocol/HeaderFactory.java b/components/net/sf/briar/protocol/HeaderFactory.java index 06c47eac93c26ff79268837ce4fe483be9e731f4..9147ff08ad3e89330922518e84f4db9c2ed9433e 100644 --- a/components/net/sf/briar/protocol/HeaderFactory.java +++ b/components/net/sf/briar/protocol/HeaderFactory.java @@ -4,12 +4,11 @@ import java.util.Map; import java.util.Set; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Header; interface HeaderFactory { - Header createHeader(BundleId id, Set<BatchId> acks, Set<GroupId> subs, + Header createHeader(Set<BatchId> acks, Set<GroupId> subs, Map<String, String> transports); } diff --git a/components/net/sf/briar/protocol/HeaderFactoryImpl.java b/components/net/sf/briar/protocol/HeaderFactoryImpl.java index ca71a7b02cae351e9a37bf14f36be087e90268d9..4f8287534476837bd7b62079bcab817b7584f8f1 100644 --- a/components/net/sf/briar/protocol/HeaderFactoryImpl.java +++ b/components/net/sf/briar/protocol/HeaderFactoryImpl.java @@ -4,14 +4,13 @@ import java.util.Map; import java.util.Set; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Header; class HeaderFactoryImpl implements HeaderFactory { - public Header createHeader(BundleId id, Set<BatchId> acks, - Set<GroupId> subs, Map<String, String> transports) { - return new HeaderImpl(id, acks, subs, transports); + public Header createHeader(Set<BatchId> acks, Set<GroupId> subs, + Map<String, String> transports) { + return new HeaderImpl(acks, subs, transports); } } diff --git a/components/net/sf/briar/protocol/HeaderImpl.java b/components/net/sf/briar/protocol/HeaderImpl.java index c1f81c1c126884771667fc1f9df2b305f678bb9f..de8356dfefb139a951c6cba7cea09f387e348014 100644 --- a/components/net/sf/briar/protocol/HeaderImpl.java +++ b/components/net/sf/briar/protocol/HeaderImpl.java @@ -4,30 +4,23 @@ import java.util.Map; import java.util.Set; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Header; /** A simple in-memory implementation of a header. */ class HeaderImpl implements Header { - private final BundleId id; private final Set<BatchId> acks; private final Set<GroupId> subs; private final Map<String, String> transports; - HeaderImpl(BundleId id, Set<BatchId> acks, Set<GroupId> subs, + HeaderImpl(Set<BatchId> acks, Set<GroupId> subs, Map<String, String> transports) { - this.id = id; this.acks = acks; this.subs = subs; this.transports = transports; } - public BundleId getId() { - return id; - } - public Set<BatchId> getAcks() { return acks; } diff --git a/components/net/sf/briar/protocol/SigningOutputStream.java b/components/net/sf/briar/protocol/SigningDigestingOutputStream.java similarity index 59% rename from components/net/sf/briar/protocol/SigningOutputStream.java rename to components/net/sf/briar/protocol/SigningDigestingOutputStream.java index a00df05c6fc16e498f57569739ab158c3b812cdd..5b411c63b9469453e53c3d90d5d7ff2765ddfc6a 100644 --- a/components/net/sf/briar/protocol/SigningOutputStream.java +++ b/components/net/sf/briar/protocol/SigningDigestingOutputStream.java @@ -3,24 +3,35 @@ package net.sf.briar.protocol; import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.security.MessageDigest; import java.security.Signature; import java.security.SignatureException; -/** An output stream that passes its output through a signature. */ -class SigningOutputStream extends FilterOutputStream { +/** + * An output stream that passes its output through a signature and a message + * digest. + */ +class SigningDigestingOutputStream extends FilterOutputStream { private final Signature signature; - private boolean signing = false; + private final MessageDigest messageDigest; + private boolean signing = false, digesting = false; - public SigningOutputStream(OutputStream out, Signature signature) { + public SigningDigestingOutputStream(OutputStream out, Signature signature, + MessageDigest messageDigest) { super(out); this.signature = signature; + this.messageDigest = messageDigest; } void setSigning(boolean signing) { this.signing = signing; } + void setDigesting(boolean digesting) { + this.digesting = digesting; + } + @Override public void write(byte[] b) throws IOException { write(b, 0, b.length); @@ -36,6 +47,7 @@ class SigningOutputStream extends FilterOutputStream { throw new IOException(e); } } + if(digesting) messageDigest.update(b, off, len); } @Override @@ -48,5 +60,6 @@ class SigningOutputStream extends FilterOutputStream { throw new IOException(e); } } + if(digesting) messageDigest.update((byte) b); } } diff --git a/test/net/sf/briar/db/DatabaseComponentTest.java b/test/net/sf/briar/db/DatabaseComponentTest.java index ebb90483ca4706f98e429d5598d8220c832b7781..298bace357d9c06c67f93c6ed1a49ba0e614d811 100644 --- a/test/net/sf/briar/db/DatabaseComponentTest.java +++ b/test/net/sf/briar/db/DatabaseComponentTest.java @@ -15,7 +15,6 @@ import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.BundleReader; import net.sf.briar.api.protocol.BundleWriter; import net.sf.briar.api.protocol.GroupId; @@ -33,7 +32,6 @@ public abstract class DatabaseComponentTest extends TestCase { protected final Object txn = new Object(); protected final AuthorId authorId; protected final BatchId batchId; - protected final BundleId bundleId; protected final ContactId contactId; protected final GroupId groupId; protected final MessageId messageId, parentId; @@ -51,7 +49,6 @@ public abstract class DatabaseComponentTest extends TestCase { super(); authorId = new AuthorId(TestUtils.getRandomId()); batchId = new BatchId(TestUtils.getRandomId()); - bundleId = new BundleId(TestUtils.getRandomId()); contactId = new ContactId(123); groupId = new GroupId(TestUtils.getRandomId()); messageId = new MessageId(TestUtils.getRandomId()); @@ -478,7 +475,6 @@ public abstract class DatabaseComponentTest extends TestCase { will(returnValue(transports)); // Build the header oneOf(bundleWriter).addHeader(acks, subs, transports); - will(returnValue(bundleId)); // Add a batch to the bundle oneOf(bundleWriter).getRemainingCapacity(); will(returnValue(1024L * 1024L - headerSize)); @@ -579,9 +575,7 @@ public abstract class DatabaseComponentTest extends TestCase { will(returnValue(null)); oneOf(bundleReader).finish(); // Lost batches - oneOf(header).getId(); - will(returnValue(bundleId)); - oneOf(database).addReceivedBundle(txn, contactId, bundleId); + oneOf(database).getLostBatches(txn, contactId); will(returnValue(Collections.singleton(batchId))); oneOf(database).removeLostBatch(txn, contactId, batchId); }}); diff --git a/test/net/sf/briar/db/H2DatabaseTest.java b/test/net/sf/briar/db/H2DatabaseTest.java index 18253c000d2181623bdfa06289435875bd4181f1..4408ab6791fc32f242c8ec1f7d7415fbec4cd477 100644 --- a/test/net/sf/briar/db/H2DatabaseTest.java +++ b/test/net/sf/briar/db/H2DatabaseTest.java @@ -21,7 +21,6 @@ import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.Status; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleId; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageFactory; @@ -489,13 +488,10 @@ public class H2DatabaseTest extends TestCase { @Test public void testRetransmission() throws DbException { - BundleId bundleId = new BundleId(TestUtils.getRandomId()); - BundleId bundleId1 = new BundleId(TestUtils.getRandomId()); - BundleId bundleId2 = new BundleId(TestUtils.getRandomId()); - BundleId bundleId3 = new BundleId(TestUtils.getRandomId()); - BundleId bundleId4 = new BundleId(TestUtils.getRandomId()); - BatchId batchId1 = new BatchId(TestUtils.getRandomId()); - BatchId batchId2 = new BatchId(TestUtils.getRandomId()); + BatchId[] ids = new BatchId[Database.RETRANSMIT_THRESHOLD + 5]; + for(int i = 0; i < ids.length; i++) { + ids[i] = new BatchId(TestUtils.getRandomId()); + } Set<MessageId> empty = Collections.emptySet(); Mockery context = new Mockery(); MessageFactory messageFactory = context.mock(MessageFactory.class); @@ -504,30 +500,62 @@ public class H2DatabaseTest extends TestCase { // Add a contact Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); - // Add an oustanding batch (associated with BundleId.NONE) - db.addOutstandingBatch(txn, contactId, batchId, empty); - // Receive a bundle - Set<BatchId> lost = db.addReceivedBundle(txn, contactId, bundleId); - assertTrue(lost.isEmpty()); - // Add a couple more outstanding batches (associated with bundleId) - db.addOutstandingBatch(txn, contactId, batchId1, empty); - db.addOutstandingBatch(txn, contactId, batchId2, empty); - // Receive another bundle - lost = db.addReceivedBundle(txn, contactId, bundleId1); - assertTrue(lost.isEmpty()); - // The contact acks one of the batches - it should not be retransmitted - db.removeAckedBatch(txn, contactId, batchId1); - // Receive another bundle - batchId should now be considered lost - lost = db.addReceivedBundle(txn, contactId, bundleId2); - assertEquals(1, lost.size()); - assertTrue(lost.contains(batchId)); - // Receive another bundle - batchId2 should now be considered lost - lost = db.addReceivedBundle(txn, contactId, bundleId3); - assertEquals(1, lost.size()); - assertTrue(lost.contains(batchId2)); - // Receive another bundle - no further losses - lost = db.addReceivedBundle(txn, contactId, bundleId4); - assertTrue(lost.isEmpty()); + // Add some outstanding batches, a few ms apart + for(int i = 0; i < ids.length; i++) { + db.addOutstandingBatch(txn, contactId, ids[i], empty); + try { + Thread.sleep(5); + } catch(InterruptedException ignored) {} + } + // The contact acks the batches in reverse order. The first + // RETRANSMIT_THRESHOLD - 1 acks should not trigger any retransmissions + for(int i = 0; i < Database.RETRANSMIT_THRESHOLD - 1; i++) { + db.removeAckedBatch(txn, contactId, ids[ids.length - i - 1]); + Set<BatchId> lost = db.getLostBatches(txn, contactId); + assertEquals(Collections.emptySet(), lost); + } + // The next ack should trigger the retransmission of the remaining + // five outstanding batches + int index = ids.length - Database.RETRANSMIT_THRESHOLD; + db.removeAckedBatch(txn, contactId, ids[index]); + Set<BatchId> lost = db.getLostBatches(txn, contactId); + for(int i = 0; i < index; i++) { + assertTrue(lost.contains(ids[i])); + } + db.commitTransaction(txn); + + db.close(); + context.assertIsSatisfied(); + } + + @Test + public void testNoRetransmission() throws DbException { + BatchId[] ids = new BatchId[Database.RETRANSMIT_THRESHOLD * 2]; + for(int i = 0; i < ids.length; i++) { + ids[i] = new BatchId(TestUtils.getRandomId()); + } + Set<MessageId> empty = Collections.emptySet(); + Mockery context = new Mockery(); + MessageFactory messageFactory = context.mock(MessageFactory.class); + Database<Connection> db = open(false, messageFactory); + + // Add a contact + Connection txn = db.startTransaction(); + assertEquals(contactId, db.addContact(txn, null)); + // Add some outstanding batches, a few ms apart + for(int i = 0; i < ids.length; i++) { + db.addOutstandingBatch(txn, contactId, ids[i], empty); + try { + Thread.sleep(5); + } catch(InterruptedException ignored) {} + } + // The contact acks the batches in the order they were sent - nothing + // should be retransmitted + for(int i = 0; i < ids.length; i++) { + db.removeAckedBatch(txn, contactId, ids[i]); + Set<BatchId> lost = db.getLostBatches(txn, contactId); + assertEquals(Collections.emptySet(), lost); + } db.commitTransaction(txn); db.close(); diff --git a/test/net/sf/briar/protocol/SigningStreamTest.java b/test/net/sf/briar/protocol/SigningStreamTest.java index e1b68e23ae2087e314f844966d590fe1195334a5..f1d284769a2a29bf955a8901cce347b3c2bb24d6 100644 --- a/test/net/sf/briar/protocol/SigningStreamTest.java +++ b/test/net/sf/briar/protocol/SigningStreamTest.java @@ -2,7 +2,6 @@ package net.sf.briar.protocol; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.security.DigestOutputStream; import java.security.KeyPair; import java.security.KeyPairGenerator; import java.security.MessageDigest; @@ -39,7 +38,8 @@ public class SigningStreamTest extends TestCase { random.nextBytes(input); ByteArrayOutputStream out = new ByteArrayOutputStream(); - SigningOutputStream signOut = new SigningOutputStream(out, sig); + SigningDigestingOutputStream signOut = + new SigningDigestingOutputStream(out, sig, dig); sig.initSign(keyPair.getPrivate()); signOut.setSigning(true); @@ -80,7 +80,8 @@ public class SigningStreamTest extends TestCase { random.nextBytes(input); ByteArrayOutputStream out = new ByteArrayOutputStream(); - SigningOutputStream signOut = new SigningOutputStream(out, sig); + SigningDigestingOutputStream signOut = + new SigningDigestingOutputStream(out, sig, dig); sig.initSign(keyPair.getPrivate()); // Sign bytes 0-499, skip bytes 500-749, sign bytes 750-999 @@ -121,16 +122,17 @@ public class SigningStreamTest extends TestCase { random.nextBytes(input); ByteArrayOutputStream out = new ByteArrayOutputStream(); - DigestOutputStream digOut = new DigestOutputStream(out, dig); + SigningDigestingOutputStream signOut = + new SigningDigestingOutputStream(out, sig, dig); dig.reset(); // Digest bytes 0-499, skip bytes 500-749, digest bytes 750-999 - digOut.on(true); - digOut.write(input, 0, 500); - digOut.on(false); - digOut.write(input, 500, 250); - digOut.on(true); - digOut.write(input, 750, 250); + signOut.setDigesting(true); + signOut.write(input, 0, 500); + signOut.setDigesting(false); + signOut.write(input, 500, 250); + signOut.setDigesting(true); + signOut.write(input, 750, 250); byte[] hash = dig.digest();