diff --git a/api/net/sf/briar/api/db/DatabaseComponent.java b/api/net/sf/briar/api/db/DatabaseComponent.java index 581b4f5d8d926977e4a8216aa01dbc3c90f75a93..b4a81b2990bb004d9967689ce4e8dc20eb3d2918 100644 --- a/api/net/sf/briar/api/db/DatabaseComponent.java +++ b/api/net/sf/briar/api/db/DatabaseComponent.java @@ -1,17 +1,22 @@ package net.sf.briar.api.db; import java.io.IOException; -import java.security.GeneralSecurityException; +import java.util.Collection; import java.util.Map; -import java.util.Set; import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.AckWriter; import net.sf.briar.api.protocol.AuthorId; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.BundleWriter; +import net.sf.briar.api.protocol.Batch; +import net.sf.briar.api.protocol.BatchWriter; import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; +import net.sf.briar.api.protocol.SubscriptionWriter; +import net.sf.briar.api.protocol.Subscriptions; +import net.sf.briar.api.protocol.TransportWriter; +import net.sf.briar.api.protocol.Transports; /** * Encapsulates the database implementation and exposes high-level operations @@ -48,20 +53,35 @@ public interface DatabaseComponent { void addLocallyGeneratedMessage(Message m) throws DbException; /** - * Generates a bundle of acknowledgements, subscriptions, and batches of - * messages for the given contact. + * Finds any lost batches that were sent to the given contact, and marks any + * messages in the batches that are still outstanding for retransmission. */ - void generateBundle(ContactId c, BundleWriter bundleBuilder) - throws DbException, IOException, GeneralSecurityException; + void findLostBatches(ContactId c) throws DbException; + + /** Generates an acknowledgement for the given contact. */ + void generateAck(ContactId c, AckWriter a) throws DbException, + IOException; + + /** Generates a batch of messages for the given contact. */ + void generateBatch(ContactId c, BatchWriter b) throws DbException, + IOException; + + /** Generates a subscription update for the given contact. */ + void generateSubscriptions(ContactId c, SubscriptionWriter s) throws + DbException, IOException; + + /** Generates a transport update for the given contact. */ + void generateTransports(ContactId c, TransportWriter t) throws + DbException, IOException; /** Returns the IDs of all contacts. */ - Set<ContactId> getContacts() throws DbException; + Collection<ContactId> getContacts() throws DbException; /** Returns the user's rating for the given author. */ Rating getRating(AuthorId a) throws DbException; /** Returns the set of groups to which the user subscribes. */ - Set<GroupId> getSubscriptions() throws DbException; + Collection<GroupId> getSubscriptions() throws DbException; /** Returns the local transport details. */ Map<String, String> getTransports() throws DbException; @@ -69,13 +89,17 @@ public interface DatabaseComponent { /** Returns the transport details for the given contact. */ Map<String, String> getTransports(ContactId c) throws DbException; - /** - * Processes a bundle of acknowledgements, subscriptions, and batches of - * messages received from the given contact. Some or all of the messages - * in the bundle may be stored. - */ - void receiveBundle(ContactId c, BundleReader b) throws DbException, - IOException, GeneralSecurityException; + /** Processes an acknowledgement from the given contact. */ + void receiveAck(ContactId c, Ack a) throws DbException; + + /** Processes a batches of messages from the given contact. */ + void receiveBatch(ContactId c, Batch b) throws DbException; + + /** Processes a subscription update from the given contact. */ + void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException; + + /** Processes a transport update from the given contact. */ + void receiveTransports(ContactId c, Transports t) throws DbException; /** Removes a contact (and all associated state) from the database. */ void removeContact(ContactId c) throws DbException; diff --git a/api/net/sf/briar/api/protocol/Ack.java b/api/net/sf/briar/api/protocol/Ack.java new file mode 100644 index 0000000000000000000000000000000000000000..40ebe1865d8d9a605f80685901ad8f3c23ccf63a --- /dev/null +++ b/api/net/sf/briar/api/protocol/Ack.java @@ -0,0 +1,16 @@ +package net.sf.briar.api.protocol; + +import java.util.Collection; + +/** A packet acknowledging receipt of one or more batches. */ +public interface Ack { + + /** + * The maximum size of a serialised ack, excluding encryption and + * authentication. + */ + static final int MAX_SIZE = (1024 * 1024) - 100; + + /** Returns the IDs of the acknowledged batches. */ + Collection<BatchId> getBatches(); +} diff --git a/api/net/sf/briar/api/protocol/AckWriter.java b/api/net/sf/briar/api/protocol/AckWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..0d9a18a2978a48367d5a2f506147624458f5cc8c --- /dev/null +++ b/api/net/sf/briar/api/protocol/AckWriter.java @@ -0,0 +1,16 @@ +package net.sf.briar.api.protocol; + +import java.io.IOException; + +/** An interface for creating an ack. */ +public interface AckWriter { + + /** + * Attempts to add the given BatchId to the ack and returns true if it + * was added. + */ + boolean addBatchId(BatchId b) throws IOException; + + /** Finishes writing the ack. */ + void finish() throws IOException; +} diff --git a/api/net/sf/briar/api/protocol/Batch.java b/api/net/sf/briar/api/protocol/Batch.java index e8534a72f4a08bb7a8e7bda460dabafabdc9a772..8097fb1fc738254dc97510f23f5b986d75e478ac 100644 --- a/api/net/sf/briar/api/protocol/Batch.java +++ b/api/net/sf/briar/api/protocol/Batch.java @@ -1,13 +1,19 @@ package net.sf.briar.api.protocol; -/** A batch of messages up to MAX_SIZE bytes in total size. */ +import java.util.Collection; + +/** A packet containing messages. */ public interface Batch { - public static final int MAX_SIZE = 1024 * 1024; + /** + * The maximum size of a serialised batch, excluding encryption and + * authentication. + */ + static final int MAX_SIZE = (1024 * 1024) - 100; /** Returns the batch's unique identifier. */ BatchId getId(); /** Returns the messages contained in the batch. */ - Iterable<Message> getMessages(); + Collection<Message> getMessages(); } \ No newline at end of file diff --git a/api/net/sf/briar/api/protocol/BatchWriter.java b/api/net/sf/briar/api/protocol/BatchWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..87a5fd3df10e11ee0de23b0856494beff3536c53 --- /dev/null +++ b/api/net/sf/briar/api/protocol/BatchWriter.java @@ -0,0 +1,19 @@ +package net.sf.briar.api.protocol; + +import java.io.IOException; + +/** An interface for creating a batch of messages. */ +public interface BatchWriter { + + /** Returns the capacity of the batch in bytes. */ + int getCapacity(); + + /** + * Attempts to add the given raw message to the batch and returns true if + * it was added. + */ + boolean addMessage(byte[] raw) throws IOException; + + /** Finishes writing the batch and returns its unique identifier. */ + BatchId finish() throws IOException; +} diff --git a/api/net/sf/briar/api/protocol/BundleReader.java b/api/net/sf/briar/api/protocol/BundleReader.java deleted file mode 100644 index ce32210902fb032191ccbb9c6a5752dd8991f039..0000000000000000000000000000000000000000 --- a/api/net/sf/briar/api/protocol/BundleReader.java +++ /dev/null @@ -1,22 +0,0 @@ -package net.sf.briar.api.protocol; - -import java.io.IOException; -import java.security.GeneralSecurityException; - -/** - * An interface for reading a bundle of acknowledgements, subscriptions, - * transport details and batches. - */ -public interface BundleReader { - - /** Returns the bundle's header. */ - Header getHeader() throws IOException, GeneralSecurityException; - - /** - * Returns the next batch of messages, or null if there are no more batches. - */ - Batch getNextBatch() throws IOException, GeneralSecurityException; - - /** Finishes reading the bundle. */ - void finish() throws IOException; -} diff --git a/api/net/sf/briar/api/protocol/BundleWriter.java b/api/net/sf/briar/api/protocol/BundleWriter.java deleted file mode 100644 index 029f1a340a60ca98f5d066003c85c222b114146c..0000000000000000000000000000000000000000 --- a/api/net/sf/briar/api/protocol/BundleWriter.java +++ /dev/null @@ -1,30 +0,0 @@ -package net.sf.briar.api.protocol; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.Collection; -import java.util.Map; - -import net.sf.briar.api.serial.Raw; - -/** - * An interface for writing a bundle of acknowledgements, subscriptions, - * transport details and batches. - */ -public interface BundleWriter { - - /** Returns the bundle's remaining capacity in bytes. */ - long getRemainingCapacity() throws IOException; - - /** Adds a header to the bundle. */ - void addHeader(Collection<BatchId> acks, Collection<GroupId> subs, - Map<String, String> transports) throws IOException, - GeneralSecurityException; - - /** Adds a batch of messages to the bundle and returns its identifier. */ - BatchId addBatch(Collection<Raw> messages) throws IOException, - GeneralSecurityException; - - /** Finishes writing the bundle. */ - void finish() throws IOException; -} diff --git a/api/net/sf/briar/api/protocol/Group.java b/api/net/sf/briar/api/protocol/Group.java new file mode 100644 index 0000000000000000000000000000000000000000..c62d203d023a7ff2cb579d6f2c65b19c6bdac5e1 --- /dev/null +++ b/api/net/sf/briar/api/protocol/Group.java @@ -0,0 +1,29 @@ +package net.sf.briar.api.protocol; + +import java.security.PublicKey; + +/** A group to which users may subscribe. */ +public interface Group { + + /** Returns the name of the group. */ + String getName(); + + /** + * Returns true if messages sent to the group must be signed with a + * particular private key. + */ + boolean isRestricted(); + + /** + * If the group is restricted, returns null. Otherwise returns a salt + * value that is combined with the group's name to generate its unique + * identifier. + */ + byte[] getSalt(); + + /** + * If the group is restricted, returns the public key that is used to + * authorise all messages sent to the group. Otherwise returns null. + */ + PublicKey getPublicKey(); +} diff --git a/api/net/sf/briar/api/protocol/Header.java b/api/net/sf/briar/api/protocol/Header.java deleted file mode 100644 index 96a99d9f3fdb1f7f8e88502029f897a875d066f2..0000000000000000000000000000000000000000 --- a/api/net/sf/briar/api/protocol/Header.java +++ /dev/null @@ -1,22 +0,0 @@ -package net.sf.briar.api.protocol; - -import java.util.Map; -import java.util.Set; - -/** A bundle header up to MAX_SIZE bytes in total size. */ -public interface Header { - - static final int MAX_SIZE = 1024 * 1024; - - /** Returns the acknowledgements contained in the header. */ - Set<BatchId> getAcks(); - - /** Returns the subscriptions contained in the header. */ - Set<GroupId> getSubscriptions(); - - /** Returns the transport details contained in the header. */ - Map<String, String> getTransports(); - - /** Returns the header's timestamp. */ - long getTimestamp(); -} diff --git a/api/net/sf/briar/api/protocol/SubscriptionWriter.java b/api/net/sf/briar/api/protocol/SubscriptionWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..7d474000151bcc948ecb99b55667b6d8f7570d18 --- /dev/null +++ b/api/net/sf/briar/api/protocol/SubscriptionWriter.java @@ -0,0 +1,11 @@ +package net.sf.briar.api.protocol; + +import java.io.IOException; + +/** An interface for creating a subscription update. */ +public interface SubscriptionWriter { + + // FIXME: This should work with groups, not IDs + /** Sets the contents of the update. */ + void setSubscriptions(Iterable<GroupId> subs) throws IOException; +} diff --git a/api/net/sf/briar/api/protocol/Subscriptions.java b/api/net/sf/briar/api/protocol/Subscriptions.java new file mode 100644 index 0000000000000000000000000000000000000000..f25c7c44cf78e4ba12aa2fe2e4d80005aad186bf --- /dev/null +++ b/api/net/sf/briar/api/protocol/Subscriptions.java @@ -0,0 +1,17 @@ +package net.sf.briar.api.protocol; + +import java.util.Collection; + +/** A packet updating the sender's subscriptions. */ +public interface Subscriptions { + + // FIXME: This should work with groups, not IDs + /** Returns the subscriptions contained in the update. */ + Collection<GroupId> getSubscriptions(); + + /** + * Returns the update's timestamp. Updates that are older than the newest + * update received from the same contact must be ignored. + */ + long getTimestamp(); +} diff --git a/api/net/sf/briar/api/protocol/Tags.java b/api/net/sf/briar/api/protocol/Tags.java index 2bf887b6f2514273b4654ae50e66e09294c17347..982c1106c9a55647e6b39eb1fd80150535d53115 100644 --- a/api/net/sf/briar/api/protocol/Tags.java +++ b/api/net/sf/briar/api/protocol/Tags.java @@ -7,11 +7,13 @@ package net.sf.briar.api.protocol; */ public interface Tags { - static final int AUTHOR_ID = 0; - static final int BATCH = 1; - static final int BATCH_ID = 2; - static final int GROUP_ID = 3; - static final int HEADER = 4; + static final int ACK = 0; + static final int AUTHOR_ID = 1; + static final int BATCH = 2; + static final int BATCH_ID = 3; + static final int GROUP_ID = 4; static final int MESSAGE = 5; static final int MESSAGE_ID = 6; + static final int SUBSCRIPTIONS = 7; + static final int TRANSPORTS = 8; } diff --git a/api/net/sf/briar/api/protocol/TransportWriter.java b/api/net/sf/briar/api/protocol/TransportWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..637017cfe7b5e574e10a06a5114a1bbb15fe7902 --- /dev/null +++ b/api/net/sf/briar/api/protocol/TransportWriter.java @@ -0,0 +1,11 @@ +package net.sf.briar.api.protocol; + +import java.io.IOException; +import java.util.Map; + +/** An interface for creating a transports update. */ +public interface TransportWriter { + + /** Sets the contents of the update. */ + void setTransports(Map<String, String> transports) throws IOException; +} diff --git a/api/net/sf/briar/api/protocol/Transports.java b/api/net/sf/briar/api/protocol/Transports.java new file mode 100644 index 0000000000000000000000000000000000000000..1c27f921c2c6249c2799bf0a0165e46f1b47471c --- /dev/null +++ b/api/net/sf/briar/api/protocol/Transports.java @@ -0,0 +1,16 @@ +package net.sf.briar.api.protocol; + +import java.util.Map; + +/** A packet updating the sender's transports. */ +public interface Transports { + + /** Returns the transports contained in the update. */ + Map<String, String> getTransports(); + + /** + * Returns the update's timestamp. Updates that are older than the newest + * update received from the same contact must be ignored. + */ + long getTimestamp(); +} diff --git a/api/net/sf/briar/api/protocol/UniqueId.java b/api/net/sf/briar/api/protocol/UniqueId.java index 170dc0c5a66166b193b03c81b5c4653a7c3981c9..4a18fc3d67c3ee107f4582df9ddfd28bfcb75c33 100644 --- a/api/net/sf/briar/api/protocol/UniqueId.java +++ b/api/net/sf/briar/api/protocol/UniqueId.java @@ -8,6 +8,7 @@ import net.sf.briar.api.serial.Writable; public abstract class UniqueId implements Raw, Writable { public static final int LENGTH = 32; + public static final int SERIALISED_LENGTH = LENGTH + 3; protected final byte[] id; diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index 3357b4b8ac4e00514c26a757ebb254afd2f08aee..ddc1b5f4381c1084b6b9a8821fe8861e7e6437df 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -1,7 +1,7 @@ package net.sf.briar.db; +import java.util.Collection; import java.util.Map; -import java.util.Set; import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; @@ -97,8 +97,8 @@ interface Database<T> { * <p> * Locking: contacts read, messages read, messageStatuses write. */ - void addOutstandingBatch(T txn, ContactId c, BatchId b, Set<MessageId> sent) - throws DbException; + void addOutstandingBatch(T txn, ContactId c, BatchId b, + Collection<MessageId> sent) throws DbException; /** * Subscribes to the given group. @@ -128,12 +128,20 @@ interface Database<T> { */ boolean containsSubscription(T txn, GroupId g) throws DbException; + /** + * Returns the IDs of any batches received from the given contact that need + * to be acknowledged. + * <p> + * Locking: contacts read, messageStatuses write. + */ + Collection<BatchId> getBatchesToAck(T txn, ContactId c) throws DbException; + /** * Returns the IDs of all contacts. * <p> * Locking: contacts read. */ - Set<ContactId> getContacts(T txn) throws DbException; + Collection<ContactId> getContacts(T txn) throws DbException; /** * Returns the amount of free storage space available to the database, in @@ -157,7 +165,7 @@ interface Database<T> { * <p> * Locking: contacts read, messages read, messageStatuses read. */ - Set<BatchId> getLostBatches(T txn, ContactId c) throws DbException; + Collection<BatchId> getLostBatches(T txn, ContactId c) throws DbException; /** * Returns the message identified by the given ID, in raw format. @@ -171,7 +179,7 @@ interface Database<T> { * <p> * Locking: messages read. */ - Iterable<MessageId> getMessagesByAuthor(T txn, AuthorId a) + Collection<MessageId> getMessagesByAuthor(T txn, AuthorId a) throws DbException; /** @@ -188,7 +196,7 @@ interface Database<T> { * <p> * Locking: messages read. */ - Iterable<MessageId> getOldMessages(T txn, long size) throws DbException; + Collection<MessageId> getOldMessages(T txn, long size) throws DbException; /** * Returns the parent of the given message. @@ -219,7 +227,7 @@ interface Database<T> { * <p> * Locking: contacts read, messages read, messageStatuses read. */ - Iterable<MessageId> getSendableMessages(T txn, ContactId c, long capacity) + Collection<MessageId> getSendableMessages(T txn, ContactId c, int capacity) throws DbException; /** @@ -227,14 +235,14 @@ interface Database<T> { * <p> * Locking: subscriptions read. */ - Set<GroupId> getSubscriptions(T txn) throws DbException; + Collection<GroupId> getSubscriptions(T txn) throws DbException; /** * Returns the groups to which the given contact subscribes. * <p> * Locking: contacts read, subscriptions read. */ - Set<GroupId> getSubscriptions(T txn, ContactId c) throws DbException; + Collection<GroupId> getSubscriptions(T txn, ContactId c) throws DbException; /** * Returns the local transport details. @@ -260,12 +268,11 @@ interface Database<T> { void removeAckedBatch(T txn, ContactId c, BatchId b) throws DbException; /** - * Removes and returns the IDs of any batches received from the given - * contact that need to be acknowledged. - * <p> - * Locking: contacts read, messageStatuses write. + * Marks the given batches received from the given contact as having been + * acknowledged. */ - Set<BatchId> removeBatchesToAck(T txn, ContactId c) throws DbException; + void removeBatchesToAck(T txn, ContactId c, Collection<BatchId> sent) + throws DbException; /** * Removes a contact (and all associated state) from the database. @@ -328,8 +335,8 @@ interface Database<T> { * <p> * Locking: contacts write, subscriptions write. */ - void setSubscriptions(T txn, ContactId c, Set<GroupId> subs, long timestamp) - throws DbException; + void setSubscriptions(T txn, ContactId c, Collection<GroupId> subs, + long timestamp) throws DbException; /** * Sets the local transport details, replacing any existing transport diff --git a/components/net/sf/briar/db/JdbcDatabase.java b/components/net/sf/briar/db/JdbcDatabase.java index 38cb62911dd85c6b738b486718f3b05bce06c252..23a2693b2bfda3955b55cf3cf368b79dad208a22 100644 --- a/components/net/sf/briar/db/JdbcDatabase.java +++ b/components/net/sf/briar/db/JdbcDatabase.java @@ -9,12 +9,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; -import java.util.HashSet; +import java.util.Collection; import java.util.LinkedList; -import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; import java.util.TreeMap; import java.util.logging.Level; import java.util.logging.Logger; @@ -457,7 +455,7 @@ abstract class JdbcDatabase implements Database<Connection> { } public void addOutstandingBatch(Connection txn, ContactId c, BatchId b, - Set<MessageId> sent) throws DbException { + Collection<MessageId> sent) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { @@ -611,14 +609,37 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<ContactId> getContacts(Connection txn) throws DbException { + public Collection<BatchId> getBatchesToAck(Connection txn, ContactId c) + throws DbException { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = "SELECT batchId FROM batchesToAck" + + " WHERE contactId = ?"; + ps = txn.prepareStatement(sql); + ps.setInt(1, c.getInt()); + rs = ps.executeQuery(); + Collection<BatchId> ids = new ArrayList<BatchId>(); + while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); + rs.close(); + ps.close(); + return ids; + } catch(SQLException e) { + tryToClose(rs); + tryToClose(ps); + tryToClose(txn); + throw new DbException(e); + } + } + + public Collection<ContactId> getContacts(Connection txn) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT contactId FROM contacts"; ps = txn.prepareStatement(sql); rs = ps.executeQuery(); - Set<ContactId> ids = new HashSet<ContactId>(); + Collection<ContactId> ids = new ArrayList<ContactId>(); while(rs.next()) ids.add(new ContactId(rs.getInt(1))); rs.close(); ps.close(); @@ -663,7 +684,7 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<BatchId> getLostBatches(Connection txn, ContactId c) + public Collection<BatchId> getLostBatches(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -674,7 +695,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(1, c.getInt()); ps.setInt(2, RETRANSMIT_THRESHOLD); rs = ps.executeQuery(); - Set<BatchId> ids = new HashSet<BatchId>(); + Collection<BatchId> ids = new ArrayList<BatchId>(); while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); rs.close(); ps.close(); @@ -714,7 +735,7 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Iterable<MessageId> getMessagesByAuthor(Connection txn, AuthorId a) + public Collection<MessageId> getMessagesByAuthor(Connection txn, AuthorId a) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -723,7 +744,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps = txn.prepareStatement(sql); ps.setBytes(1, a.getBytes()); rs = ps.executeQuery(); - List<MessageId> ids = new ArrayList<MessageId>(); + Collection<MessageId> ids = new ArrayList<MessageId>(); while(rs.next()) ids.add(new MessageId(rs.getBytes(1))); rs.close(); ps.close(); @@ -799,7 +820,7 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Iterable<MessageId> getOldMessages(Connection txn, long capacity) + public Collection<MessageId> getOldMessages(Connection txn, long capacity) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -808,7 +829,7 @@ abstract class JdbcDatabase implements Database<Connection> { + " ORDER BY timestamp"; ps = txn.prepareStatement(sql); rs = ps.executeQuery(); - List<MessageId> ids = new ArrayList<MessageId>(); + Collection<MessageId> ids = new ArrayList<MessageId>(); long total = 0L; while(rs.next()) { int size = rs.getInt(1); @@ -901,8 +922,8 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Iterable<MessageId> getSendableMessages(Connection txn, - ContactId c, long capacity) throws DbException { + public Collection<MessageId> getSendableMessages(Connection txn, + ContactId c, int capacity) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { @@ -919,8 +940,8 @@ abstract class JdbcDatabase implements Database<Connection> { ps.setInt(2, c.getInt()); ps.setShort(3, (short) Status.NEW.ordinal()); rs = ps.executeQuery(); - List<MessageId> ids = new ArrayList<MessageId>(); - long total = 0; + Collection<MessageId> ids = new ArrayList<MessageId>(); + int total = 0; while(rs.next()) { int size = rs.getInt(1); if(total + size > capacity) break; @@ -943,14 +964,15 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<GroupId> getSubscriptions(Connection txn) throws DbException { + public Collection<GroupId> getSubscriptions(Connection txn) + throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { String sql = "SELECT groupId FROM localSubscriptions"; ps = txn.prepareStatement(sql); rs = ps.executeQuery(); - Set<GroupId> ids = new HashSet<GroupId>(); + Collection<GroupId> ids = new ArrayList<GroupId>(); while(rs.next()) ids.add(new GroupId(rs.getBytes(1))); rs.close(); ps.close(); @@ -963,7 +985,7 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<GroupId> getSubscriptions(Connection txn, ContactId c) + public Collection<GroupId> getSubscriptions(Connection txn, ContactId c) throws DbException { PreparedStatement ps = null; ResultSet rs = null; @@ -973,7 +995,7 @@ abstract class JdbcDatabase implements Database<Connection> { ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); rs = ps.executeQuery(); - Set<GroupId> ids = new HashSet<GroupId>(); + Collection<GroupId> ids = new ArrayList<GroupId>(); while(rs.next()) ids.add(new GroupId(rs.getBytes(1))); rs.close(); ps.close(); @@ -1114,29 +1136,25 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public Set<BatchId> removeBatchesToAck(Connection txn, ContactId c) - throws DbException { + public void removeBatchesToAck(Connection txn, ContactId c, + Collection<BatchId> sent) throws DbException { PreparedStatement ps = null; - ResultSet rs = null; try { - String sql = "SELECT batchId FROM batchesToAck" - + " WHERE contactId = ?"; - ps = txn.prepareStatement(sql); - ps.setInt(1, c.getInt()); - rs = ps.executeQuery(); - Set<BatchId> ids = new HashSet<BatchId>(); - while(rs.next()) ids.add(new BatchId(rs.getBytes(1))); - rs.close(); - ps.close(); - sql = "DELETE FROM batchesToAck WHERE contactId = ?"; + String sql = "DELETE FROM batchesToAck" + + " WHERE contactId = ? and batchId = ?"; ps = txn.prepareStatement(sql); ps.setInt(1, c.getInt()); - int rowsAffected = ps.executeUpdate(); - assert rowsAffected == ids.size(); + for(BatchId b : sent) { + ps.setBytes(2, b.getBytes()); + ps.addBatch(); + } + int[] rowsAffectedArray = ps.executeBatch(); + assert rowsAffectedArray.length == sent.size(); + for(int i = 0; i < rowsAffectedArray.length; i++) { + assert rowsAffectedArray[i] == 1; + } ps.close(); - return ids; } catch(SQLException e) { - tryToClose(rs); tryToClose(ps); tryToClose(txn); throw new DbException(e); @@ -1310,8 +1328,8 @@ abstract class JdbcDatabase implements Database<Connection> { } } - public void setSubscriptions(Connection txn, ContactId c, Set<GroupId> subs, - long timestamp) throws DbException { + public void setSubscriptions(Connection txn, ContactId c, + Collection<GroupId> subs, long timestamp) throws DbException { PreparedStatement ps = null; ResultSet rs = null; try { diff --git a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java index 0aac5a186c03021039a4f32fa5b4ce7f47a33563..c0bc0739254fcc8bf79b0c642f2779c2aad5fa0b 100644 --- a/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java +++ b/components/net/sf/briar/db/ReadWriteLockDatabaseComponent.java @@ -1,12 +1,10 @@ package net.sf.briar.db; import java.io.IOException; -import java.security.GeneralSecurityException; -import java.security.SignatureException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -17,17 +15,19 @@ import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NoSuchContactException; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.AckWriter; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.BundleWriter; +import net.sf.briar.api.protocol.BatchWriter; import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; -import net.sf.briar.api.serial.Raw; -import net.sf.briar.api.serial.RawByteArray; +import net.sf.briar.api.protocol.SubscriptionWriter; +import net.sf.briar.api.protocol.Subscriptions; +import net.sf.briar.api.protocol.TransportWriter; +import net.sf.briar.api.protocol.Transports; import com.google.inject.Inject; @@ -63,6 +63,34 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { super(db, cleaner); } + protected void expireMessages(long size) throws DbException { + contactLock.readLock().lock(); + try { + messageLock.writeLock().lock(); + try { + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + for(MessageId m : db.getOldMessages(txn, size)) { + removeMessage(txn, m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.writeLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + public void close() throws DbException { cleaner.stopCleaning(); contactLock.writeLock().lock(); @@ -162,18 +190,19 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - protected void expireMessages(long size) throws DbException { + public void findLostBatches(ContactId c) throws DbException { + // Find any lost batches that need to be retransmitted + Collection<BatchId> lost; contactLock.readLock().lock(); try { - messageLock.writeLock().lock(); + if(!containsContact(c)) throw new NoSuchContactException(); + messageLock.readLock().lock(); try { messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { - for(MessageId m : db.getOldMessages(txn, size)) { - removeMessage(txn, m); - } + lost = db.getLostBatches(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -183,106 +212,75 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { messageStatusLock.writeLock().unlock(); } } finally { - messageLock.writeLock().unlock(); + messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } - } - - public void generateBundle(ContactId c, BundleWriter b) throws DbException, - IOException, GeneralSecurityException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c); - Set<BatchId> acks = generateAcks(c); - Set<GroupId> subs = generateSubscriptions(c); - Map<String, String> transports = generateTransports(c); - // Add the header to the bundle - b.addHeader(acks, subs, transports); - // Add as many messages as possible to the bundle - while(generateBatch(c, b)); - b.finish(); - if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated"); - System.gc(); - } - - private Set<BatchId> generateAcks(ContactId c) throws DbException { - contactLock.readLock().lock(); - try { - if(!containsContact(c)) throw new NoSuchContactException(); - messageStatusLock.writeLock().lock(); + for(BatchId batch : lost) { + contactLock.readLock().lock(); try { - Txn txn = db.startTransaction(); + if(!containsContact(c)) throw new NoSuchContactException(); + messageLock.readLock().lock(); try { - Set<BatchId> acks = db.removeBatchesToAck(txn, c); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + acks.size() + " acks"); - db.commitTransaction(txn); - return acks; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; + messageStatusLock.writeLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Removing lost batch"); + db.removeLostBatch(txn, c, batch); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + messageStatusLock.writeLock().unlock(); + } + } finally { + messageLock.readLock().unlock(); } } finally { - messageStatusLock.writeLock().unlock(); + contactLock.readLock().unlock(); } - } finally { - contactLock.readLock().unlock(); } } - private Set<GroupId> generateSubscriptions(ContactId c) throws DbException { + public void generateAck(ContactId c, AckWriter a) throws DbException, + IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); - subscriptionLock.readLock().lock(); + messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { - Set<GroupId> subs = db.getSubscriptions(txn); + Collection<BatchId> acks = db.getBatchesToAck(txn, c); + Collection<BatchId> sent = new ArrayList<BatchId>(); + for(BatchId b : acks) if(a.addBatchId(b)) sent.add(b); + a.finish(); + db.removeBatchesToAck(txn, c, sent); if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + subs.size() + " subscriptions"); + LOG.fine("Added " + acks.size() + " acks"); db.commitTransaction(txn); - return subs; } catch(DbException e) { db.abortTransaction(txn); throw e; - } - } finally { - subscriptionLock.readLock().unlock(); - } - } finally { - contactLock.readLock().unlock(); - } - } - - private Map<String, String> generateTransports(ContactId c) - throws DbException { - contactLock.readLock().lock(); - try { - if(!containsContact(c)) throw new NoSuchContactException(); - transportLock.readLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - Map<String, String> transports = db.getTransports(txn); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Added " + transports.size() + " transports"); - db.commitTransaction(txn); - return transports; - } catch(DbException e) { + } catch(IOException e) { db.abortTransaction(txn); throw e; } } finally { - transportLock.readLock().unlock(); + messageStatusLock.writeLock().unlock(); } } finally { contactLock.readLock().unlock(); } } - private boolean generateBatch(ContactId c, BundleWriter b) - throws DbException, IOException, GeneralSecurityException { + public void generateBatch(ContactId c, BatchWriter b) throws DbException, + IOException { contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); @@ -290,29 +288,21 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { Set<MessageId> sent; int bytesSent = 0; - BatchId batchId; messageStatusLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { - long capacity = - Math.min(b.getRemainingCapacity(), Batch.MAX_SIZE); + int capacity = b.getCapacity(); Iterator<MessageId> it = db.getSendableMessages(txn, c, capacity).iterator(); - if(!it.hasNext()) { - db.commitTransaction(txn); - return false; // No more messages to send - } sent = new HashSet<MessageId>(); - List<Raw> messages = new ArrayList<Raw>(); while(it.hasNext()) { MessageId m = it.next(); byte[] message = db.getMessage(txn, m); + if(!b.addMessage(message)) break; bytesSent += message.length; - messages.add(new RawByteArray(message)); sent.add(m); } - batchId = b.addBatch(messages); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -320,23 +310,19 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } catch(IOException e) { db.abortTransaction(txn); throw e; - } catch(SignatureException e) { - db.abortTransaction(txn); - throw e; } } finally { messageStatusLock.readLock().unlock(); } - // Record the contents of the batch + BatchId id = b.finish(); + // Record the contents of the batch, unless it was empty + if(sent.isEmpty()) return; messageStatusLock.writeLock().lock(); try { Txn txn = db.startTransaction(); try { - assert !sent.isEmpty(); - db.addOutstandingBatch(txn, c, batchId, sent); + db.addOutstandingBatch(txn, c, id, sent); db.commitTransaction(txn); - // Don't create another batch if this one was half-empty - return bytesSent > Batch.MAX_SIZE / 2; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -352,12 +338,71 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public Set<ContactId> getContacts() throws DbException { + public void generateSubscriptions(ContactId c, SubscriptionWriter s) + throws DbException, IOException { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + subscriptionLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + // FIXME: This should deal in Groups, not GroupIds + Collection<GroupId> subs = db.getSubscriptions(txn); + s.setSubscriptions(subs); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + subs.size() + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + subscriptionLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + + public void generateTransports(ContactId c, TransportWriter t) + throws DbException, IOException { + contactLock.readLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + transportLock.readLock().lock(); + try { + Txn txn = db.startTransaction(); + try { + Map<String, String> transports = db.getTransports(txn); + t.setTransports(transports); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Added " + transports.size() + " transports"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } finally { + transportLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + } + + public Collection<ContactId> getContacts() throws DbException { contactLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { - Set<ContactId> contacts = db.getContacts(txn); + Collection<ContactId> contacts = db.getContacts(txn); db.commitTransaction(txn); return contacts; } catch(DbException e) { @@ -386,12 +431,12 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public Set<GroupId> getSubscriptions() throws DbException { + public Collection<GroupId> getSubscriptions() throws DbException { subscriptionLock.readLock().lock(); try { Txn txn = db.startTransaction(); try { - Set<GroupId> subs = db.getSubscriptions(txn); + Collection<GroupId> subs = db.getSubscriptions(txn); db.commitTransaction(txn); return subs; } catch(DbException e) { @@ -443,28 +488,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public void receiveBundle(ContactId c, BundleReader b) throws DbException, - IOException, GeneralSecurityException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c); - Header h = b.getHeader(); - receiveAcks(c, h); - receiveSubscriptions(c, h); - receiveTransports(c, h); - // Store the messages - int batches = 0; - Batch batch = null; - while((batch = b.getNextBatch()) != null) { - receiveBatch(c, batch); - batches++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + batches + " batches"); - b.finish(); - findLostBatches(c); - System.gc(); - } - - private void receiveAcks(ContactId c, Header h) throws DbException { + public void receiveAck(ContactId c, Ack a) throws DbException { // Mark all messages in acked batches as seen contactLock.readLock().lock(); try { @@ -473,7 +497,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { try { messageStatusLock.writeLock().lock(); try { - Set<BatchId> acks = h.getAcks(); + Collection<BatchId> acks = a.getBatches(); for(BatchId ack : acks) { Txn txn = db.startTransaction(); try { @@ -497,61 +521,7 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - private void receiveSubscriptions(ContactId c, Header h) - throws DbException { - // Update the contact's subscriptions - contactLock.writeLock().lock(); - try { - if(!containsContact(c)) throw new NoSuchContactException(); - subscriptionLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - Set<GroupId> subs = h.getSubscriptions(); - db.setSubscriptions(txn, c, subs, h.getTimestamp()); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + subs.size() + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - subscriptionLock.writeLock().unlock(); - } - } finally { - contactLock.writeLock().unlock(); - } - } - - private void receiveTransports(ContactId c, Header h) throws DbException { - // Update the contact's transport details - contactLock.writeLock().lock(); - try { - if(!containsContact(c)) throw new NoSuchContactException(); - transportLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - Map<String, String> transports = h.getTransports(); - db.setTransports(txn, c, transports, h.getTimestamp()); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + transports.size() - + " transports"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - transportLock.writeLock().unlock(); - } - } finally { - contactLock.writeLock().unlock(); - } - } - - private void receiveBatch(ContactId c, Batch b) throws DbException { + public void receiveBatch(ContactId c, Batch b) throws DbException { waitForPermissionToWrite(); contactLock.readLock().lock(); try { @@ -595,60 +565,58 @@ class ReadWriteLockDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - private void findLostBatches(ContactId c) throws DbException { - // Find any lost batches that need to be retransmitted - Set<BatchId> lost; - contactLock.readLock().lock(); + public void receiveSubscriptions(ContactId c, Subscriptions s) + throws DbException { + // Update the contact's subscriptions + contactLock.writeLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); - messageLock.readLock().lock(); + subscriptionLock.writeLock().lock(); try { - messageStatusLock.writeLock().lock(); + Txn txn = db.startTransaction(); try { - Txn txn = db.startTransaction(); - try { - lost = db.getLostBatches(txn, c); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); + Collection<GroupId> subs = s.getSubscriptions(); + db.setSubscriptions(txn, c, subs, s.getTimestamp()); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + subs.size() + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } } finally { - messageLock.readLock().unlock(); + subscriptionLock.writeLock().unlock(); } } finally { - contactLock.readLock().unlock(); + contactLock.writeLock().unlock(); } - for(BatchId batch : lost) { - contactLock.readLock().lock(); + } + + public void receiveTransports(ContactId c, Transports t) + throws DbException { + // Update the contact's transport details + contactLock.writeLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + transportLock.writeLock().lock(); try { - if(!containsContact(c)) throw new NoSuchContactException(); - messageLock.readLock().lock(); + Txn txn = db.startTransaction(); try { - messageStatusLock.writeLock().lock(); - try { - Txn txn = db.startTransaction(); - try { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Removing lost batch"); - db.removeLostBatch(txn, c, batch); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } - } finally { - messageLock.readLock().unlock(); + Map<String, String> transports = t.getTransports(); + db.setTransports(txn, c, transports, t.getTimestamp()); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + transports.size() + + " transports"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } } finally { - contactLock.readLock().unlock(); + transportLock.writeLock().unlock(); } + } finally { + contactLock.writeLock().unlock(); } } diff --git a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java index 93fffc9c1354cddf3032f897cf5984b1fcce8d43..60f99f656895956685d3037c4fa4c01c948e6593 100644 --- a/components/net/sf/briar/db/SynchronizedDatabaseComponent.java +++ b/components/net/sf/briar/db/SynchronizedDatabaseComponent.java @@ -1,12 +1,10 @@ package net.sf.briar.db; import java.io.IOException; -import java.security.GeneralSecurityException; -import java.security.SignatureException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.logging.Level; @@ -16,17 +14,19 @@ import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NoSuchContactException; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.AckWriter; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.BundleWriter; +import net.sf.briar.api.protocol.BatchWriter; import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; -import net.sf.briar.api.serial.Raw; -import net.sf.briar.api.serial.RawByteArray; +import net.sf.briar.api.protocol.SubscriptionWriter; +import net.sf.briar.api.protocol.Subscriptions; +import net.sf.briar.api.protocol.TransportWriter; +import net.sf.briar.api.protocol.Transports; import com.google.inject.Inject; @@ -56,6 +56,25 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { super(db, cleaner); } + protected void expireMessages(long size) throws DbException { + synchronized(contactLock) { + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + for(MessageId m : db.getOldMessages(txn, size)) { + removeMessage(txn, m); + } + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + public void close() throws DbException { cleaner.stopCleaning(); synchronized(contactLock) { @@ -124,15 +143,16 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - protected void expireMessages(long size) throws DbException { + public void findLostBatches(ContactId c) throws DbException { + // Find any lost batches that need to be retransmitted + Collection<BatchId> lost; synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { - for(MessageId m : db.getOldMessages(txn, size)) { - removeMessage(txn, m); - } + lost = db.getLostBatches(txn, c); db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); @@ -141,134 +161,142 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } } + for(BatchId batch : lost) { + synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Removing lost batch"); + db.removeLostBatch(txn, c, batch); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } } - public void generateBundle(ContactId c, BundleWriter b) throws DbException, - IOException, GeneralSecurityException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Generating bundle for " + c); - Set<BatchId> acks = generateAcks(c); - Set<GroupId> subs = generateSubscriptions(c); - Map<String, String> transports = generateTransports(c); - // Add the header to the bundle - b.addHeader(acks, subs, transports); - // Add as many messages as possible to the bundle - while(generateBatch(c, b)); - b.finish(); - if(LOG.isLoggable(Level.FINE)) LOG.fine("Bundle generated"); - System.gc(); - } - - private Set<BatchId> generateAcks(ContactId c) throws DbException { + public void generateAck(ContactId c, AckWriter a) throws DbException, + IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageStatusLock) { Txn txn = db.startTransaction(); try { - Set<BatchId> acks = db.removeBatchesToAck(txn, c); + Collection<BatchId> acks = db.getBatchesToAck(txn, c); + Collection<BatchId> sent = new ArrayList<BatchId>(); + for(BatchId b : acks) if(a.addBatchId(b)) sent.add(b); + a.finish(); + db.removeBatchesToAck(txn, c, sent); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + acks.size() + " acks"); db.commitTransaction(txn); - return acks; } catch(DbException e) { db.abortTransaction(txn); throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; } } } } - private Set<GroupId> generateSubscriptions(ContactId c) throws DbException { + public void generateBatch(ContactId c, BatchWriter b) throws DbException, + IOException { + synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); + synchronized(messageLock) { + synchronized(messageStatusLock) { + Txn txn = db.startTransaction(); + try { + int capacity = b.getCapacity(); + Iterator<MessageId> it = + db.getSendableMessages(txn, c, capacity).iterator(); + Set<MessageId> sent = new HashSet<MessageId>(); + int bytesSent = 0; + while(it.hasNext()) { + MessageId m = it.next(); + byte[] message = db.getMessage(txn, m); + if(!b.addMessage(message)) break; + bytesSent += message.length; + sent.add(m); + } + BatchId id = b.finish(); + // Record the contents of the batch, unless it was empty + if(!sent.isEmpty()) + db.addOutstandingBatch(txn, c, id, sent); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; + } + } + } + } + } + + public void generateSubscriptions(ContactId c, SubscriptionWriter s) + throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { - Set<GroupId> subs = db.getSubscriptions(txn); + // FIXME: This should deal in Groups, not GroupIds + Collection<GroupId> subs = db.getSubscriptions(txn); + s.setSubscriptions(subs); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + subs.size() + " subscriptions"); db.commitTransaction(txn); - return subs; } catch(DbException e) { db.abortTransaction(txn); throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; } } } } - private Map<String, String> generateTransports(ContactId c) - throws DbException { + public void generateTransports(ContactId c, TransportWriter t) + throws DbException, IOException { synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(transportLock) { Txn txn = db.startTransaction(); try { Map<String, String> transports = db.getTransports(txn); + t.setTransports(transports); if(LOG.isLoggable(Level.FINE)) LOG.fine("Added " + transports.size() + " transports"); db.commitTransaction(txn); - return transports; } catch(DbException e) { db.abortTransaction(txn); throw e; + } catch(IOException e) { + db.abortTransaction(txn); + throw e; } } } } - private boolean generateBatch(ContactId c, BundleWriter b) - throws DbException, IOException, GeneralSecurityException { - synchronized(contactLock) { - if(!containsContact(c)) throw new NoSuchContactException(); - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - long capacity = - Math.min(b.getRemainingCapacity(), Batch.MAX_SIZE); - Iterator<MessageId> it = - db.getSendableMessages(txn, c, capacity).iterator(); - if(!it.hasNext()) { - db.commitTransaction(txn); - return false; // No more messages to send - } - Set<MessageId> sent = new HashSet<MessageId>(); - List<Raw> messages = new ArrayList<Raw>(); - int bytesSent = 0; - while(it.hasNext()) { - MessageId m = it.next(); - byte[] message = db.getMessage(txn, m); - bytesSent += message.length; - messages.add(new RawByteArray(message)); - sent.add(m); - } - BatchId batchId = b.addBatch(messages); - // Record the contents of the batch - assert !sent.isEmpty(); - db.addOutstandingBatch(txn, c, batchId, sent); - db.commitTransaction(txn); - // Don't create another batch if this one was half-empty - return bytesSent > Batch.MAX_SIZE / 2; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } catch(IOException e) { - db.abortTransaction(txn); - throw e; - } catch(SignatureException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - } - - public Set<ContactId> getContacts() throws DbException { + public Collection<ContactId> getContacts() throws DbException { synchronized(contactLock) { Txn txn = db.startTransaction(); try { - Set<ContactId> contacts = db.getContacts(txn); + Collection<ContactId> contacts = db.getContacts(txn); db.commitTransaction(txn); return contacts; } catch(DbException e) { @@ -292,11 +320,11 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public Set<GroupId> getSubscriptions() throws DbException { + public Collection<GroupId> getSubscriptions() throws DbException { synchronized(subscriptionLock) { Txn txn = db.startTransaction(); try { - Set<GroupId> subs = db.getSubscriptions(txn); + Collection<GroupId> subs = db.getSubscriptions(txn); db.commitTransaction(txn); return subs; } catch(DbException e) { @@ -337,34 +365,13 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - public void receiveBundle(ContactId c, BundleReader b) throws DbException, - IOException, GeneralSecurityException { - if(LOG.isLoggable(Level.FINE)) LOG.fine("Received bundle from " + c); - Header h = b.getHeader(); - receiveAcks(c, h); - receiveSubscriptions(c, h); - receiveTransports(c, h); - // Store the messages - int batches = 0; - Batch batch = null; - while((batch = b.getNextBatch()) != null) { - receiveBatch(c, batch); - batches++; - } - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + batches + " batches"); - b.finish(); - findLostBatches(c); - System.gc(); - } - - private void receiveAcks(ContactId c, Header h) throws DbException { + public void receiveAck(ContactId c, Ack a) throws DbException { // Mark all messages in acked batches as seen synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); synchronized(messageLock) { synchronized(messageStatusLock) { - Set<BatchId> acks = h.getAcks(); + Collection<BatchId> acks = a.getBatches(); for(BatchId ack : acks) { Txn txn = db.startTransaction(); try { @@ -382,49 +389,7 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - private void receiveSubscriptions(ContactId c, Header h) - throws DbException { - // Update the contact's subscriptions - synchronized(contactLock) { - if(!containsContact(c)) throw new NoSuchContactException(); - synchronized(subscriptionLock) { - Txn txn = db.startTransaction(); - try { - Set<GroupId> subs = h.getSubscriptions(); - db.setSubscriptions(txn, c, subs, h.getTimestamp()); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + subs.size() + " subscriptions"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - - private void receiveTransports(ContactId c, Header h) throws DbException { - // Update the contact's transport details - synchronized(contactLock) { - if(!containsContact(c)) throw new NoSuchContactException(); - synchronized(transportLock) { - Txn txn = db.startTransaction(); - try { - Map<String, String> transports = h.getTransports(); - db.setTransports(txn, c, transports, h.getTimestamp()); - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Received " + transports.size() - + " transports"); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } - } - } - - private void receiveBatch(ContactId c, Batch b) throws DbException { + public void receiveBatch(ContactId c, Batch b) throws DbException { waitForPermissionToWrite(); synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); @@ -456,41 +421,44 @@ class SynchronizedDatabaseComponent<Txn> extends DatabaseComponentImpl<Txn> { } } - private void findLostBatches(ContactId c) + public void receiveSubscriptions(ContactId c, Subscriptions s) throws DbException { - // Find any lost batches that need to be retransmitted - Set<BatchId> lost; + // Update the contact's subscriptions synchronized(contactLock) { if(!containsContact(c)) throw new NoSuchContactException(); - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - lost = db.getLostBatches(txn, c); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } + synchronized(subscriptionLock) { + Txn txn = db.startTransaction(); + try { + Collection<GroupId> subs = s.getSubscriptions(); + db.setSubscriptions(txn, c, subs, s.getTimestamp()); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + subs.size() + " subscriptions"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } } } - for(BatchId batch : lost) { - synchronized(contactLock) { - if(!containsContact(c)) throw new NoSuchContactException(); - synchronized(messageLock) { - synchronized(messageStatusLock) { - Txn txn = db.startTransaction(); - try { - if(LOG.isLoggable(Level.FINE)) - LOG.fine("Removing lost batch"); - db.removeLostBatch(txn, c, batch); - db.commitTransaction(txn); - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } + } + + public void receiveTransports(ContactId c, Transports t) + throws DbException { + // Update the contact's transport details + synchronized(contactLock) { + if(!containsContact(c)) throw new NoSuchContactException(); + synchronized(transportLock) { + Txn txn = db.startTransaction(); + try { + Map<String, String> transports = t.getTransports(); + db.setTransports(txn, c, transports, t.getTimestamp()); + if(LOG.isLoggable(Level.FINE)) + LOG.fine("Received " + transports.size() + + " transports"); + db.commitTransaction(txn); + } catch(DbException e) { + db.abortTransaction(txn); + throw e; } } } diff --git a/components/net/sf/briar/protocol/AckFactory.java b/components/net/sf/briar/protocol/AckFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..8ed23f90b106c9f8a78a9f703e70d47f2d48ab51 --- /dev/null +++ b/components/net/sf/briar/protocol/AckFactory.java @@ -0,0 +1,11 @@ +package net.sf.briar.protocol; + +import java.util.Collection; + +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.BatchId; + +interface AckFactory { + + Ack createAck(Collection<BatchId> batches); +} diff --git a/components/net/sf/briar/protocol/AckFactoryImpl.java b/components/net/sf/briar/protocol/AckFactoryImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..900022e62173d47d5454fd27f126f1b35c7dbf5c --- /dev/null +++ b/components/net/sf/briar/protocol/AckFactoryImpl.java @@ -0,0 +1,13 @@ +package net.sf.briar.protocol; + +import java.util.Collection; + +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.BatchId; + +class AckFactoryImpl implements AckFactory { + + public Ack createAck(Collection<BatchId> batches) { + return new AckImpl(batches); + } +} diff --git a/components/net/sf/briar/protocol/AckImpl.java b/components/net/sf/briar/protocol/AckImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..939f83bdd2c25db80c3e682ecf48ee3bacced58f --- /dev/null +++ b/components/net/sf/briar/protocol/AckImpl.java @@ -0,0 +1,19 @@ +package net.sf.briar.protocol; + +import java.util.Collection; + +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.BatchId; + +public class AckImpl implements Ack { + + private final Collection<BatchId> batches; + + AckImpl(Collection<BatchId> batches) { + this.batches = batches; + } + + public Collection<BatchId> getBatches() { + return batches; + } +} diff --git a/components/net/sf/briar/protocol/AckReader.java b/components/net/sf/briar/protocol/AckReader.java new file mode 100644 index 0000000000000000000000000000000000000000..b584082a4d5b89d29702475db37b9f6acac2e661 --- /dev/null +++ b/components/net/sf/briar/protocol/AckReader.java @@ -0,0 +1,33 @@ +package net.sf.briar.protocol; + +import java.io.IOException; +import java.util.Collection; + +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.BatchId; +import net.sf.briar.api.protocol.Tags; +import net.sf.briar.api.serial.ObjectReader; +import net.sf.briar.api.serial.Reader; + +public class AckReader implements ObjectReader<Ack> { + + private final AckFactory ackFactory; + + AckReader(AckFactory ackFactory) { + this.ackFactory = ackFactory; + } + + public Ack readObject(Reader r) throws IOException { + // Initialise the consumer + CountingConsumer counting = new CountingConsumer(Ack.MAX_SIZE); + // Read and digest the data + r.addConsumer(counting); + r.readUserDefinedTag(Tags.ACK); + r.addObjectReader(Tags.BATCH_ID, new BatchIdReader()); + Collection<BatchId> batches = r.readList(BatchId.class); + r.removeObjectReader(Tags.BATCH_ID); + r.removeConsumer(counting); + // Build and return the ack + return ackFactory.createAck(batches); + } +} diff --git a/components/net/sf/briar/protocol/AckWriterImpl.java b/components/net/sf/briar/protocol/AckWriterImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..1d5282a5051c20cd4f9c583257f134198bbeddd8 --- /dev/null +++ b/components/net/sf/briar/protocol/AckWriterImpl.java @@ -0,0 +1,49 @@ +package net.sf.briar.protocol; + +import java.io.IOException; +import java.io.OutputStream; + +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.AckWriter; +import net.sf.briar.api.protocol.BatchId; +import net.sf.briar.api.protocol.Tags; +import net.sf.briar.api.serial.Writer; +import net.sf.briar.api.serial.WriterFactory; + +class AckWriterImpl implements AckWriter { + + private final OutputStream out; + private final Writer w; + + private boolean started = false, finished = false; + + AckWriterImpl(OutputStream out, WriterFactory writerFactory) { + this.out = out; + this.w = writerFactory.createWriter(out); + } + + public boolean addBatchId(BatchId b) throws IOException { + if(finished) throw new IllegalStateException(); + if(!started) { + w.writeUserDefinedTag(Tags.ACK); + w.writeListStart(); + started = true; + } + int capacity = Ack.MAX_SIZE - (int) w.getBytesWritten() - 1; + if(capacity < BatchId.SERIALISED_LENGTH) return false; + b.writeTo(w); + return true; + } + + public void finish() throws IOException { + if(finished) throw new IllegalStateException(); + if(!started) { + w.writeUserDefinedTag(Tags.ACK); + w.writeListStart(); + started = true; + } + w.writeListEnd(); + out.flush(); + finished = true; + } +} diff --git a/components/net/sf/briar/protocol/BatchFactory.java b/components/net/sf/briar/protocol/BatchFactory.java index 8efe7fc0693f6a2dd96b361bbc868d36dd72cff0..5bbffdbcd0224592868ed3457f008b7012513879 100644 --- a/components/net/sf/briar/protocol/BatchFactory.java +++ b/components/net/sf/briar/protocol/BatchFactory.java @@ -1,6 +1,6 @@ package net.sf.briar.protocol; -import java.util.List; +import java.util.Collection; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; @@ -8,5 +8,5 @@ import net.sf.briar.api.protocol.Message; interface BatchFactory { - Batch createBatch(BatchId id, List<Message> messages); + Batch createBatch(BatchId id, Collection<Message> messages); } diff --git a/components/net/sf/briar/protocol/BatchFactoryImpl.java b/components/net/sf/briar/protocol/BatchFactoryImpl.java index ee39fa919fcfa125d3e2ff21fa49a4fa5326d43f..a21bef6585467532e222cebbb491fe57a9a8d0b5 100644 --- a/components/net/sf/briar/protocol/BatchFactoryImpl.java +++ b/components/net/sf/briar/protocol/BatchFactoryImpl.java @@ -1,6 +1,6 @@ package net.sf.briar.protocol; -import java.util.List; +import java.util.Collection; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; @@ -8,7 +8,7 @@ import net.sf.briar.api.protocol.Message; class BatchFactoryImpl implements BatchFactory { - public Batch createBatch(BatchId id, List<Message> messages) { + public Batch createBatch(BatchId id, Collection<Message> messages) { return new BatchImpl(id, messages); } } diff --git a/components/net/sf/briar/protocol/BatchImpl.java b/components/net/sf/briar/protocol/BatchImpl.java index 9f7244ff00432fcb78358068e6dae801a63f6f36..86f54548ea9ed7035545f4910c6f11f75f25d0cc 100644 --- a/components/net/sf/briar/protocol/BatchImpl.java +++ b/components/net/sf/briar/protocol/BatchImpl.java @@ -1,6 +1,6 @@ package net.sf.briar.protocol; -import java.util.List; +import java.util.Collection; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; @@ -10,9 +10,9 @@ import net.sf.briar.api.protocol.Message; class BatchImpl implements Batch { private final BatchId id; - private final List<Message> messages; + private final Collection<Message> messages; - BatchImpl(BatchId id, List<Message> messages) { + BatchImpl(BatchId id, Collection<Message> messages) { this.id = id; this.messages = messages; } @@ -21,7 +21,7 @@ class BatchImpl implements Batch { return id; } - public Iterable<Message> getMessages() { + public Collection<Message> getMessages() { return messages; } } diff --git a/components/net/sf/briar/protocol/BatchWriterImpl.java b/components/net/sf/briar/protocol/BatchWriterImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..e19da4539671c2c721ee7344e17cb608e6a8ae64 --- /dev/null +++ b/components/net/sf/briar/protocol/BatchWriterImpl.java @@ -0,0 +1,62 @@ +package net.sf.briar.protocol; + +import java.io.IOException; +import java.io.OutputStream; +import java.security.DigestOutputStream; +import java.security.MessageDigest; + +import net.sf.briar.api.protocol.Batch; +import net.sf.briar.api.protocol.BatchId; +import net.sf.briar.api.protocol.BatchWriter; +import net.sf.briar.api.protocol.Tags; +import net.sf.briar.api.serial.Writer; +import net.sf.briar.api.serial.WriterFactory; + +class BatchWriterImpl implements BatchWriter { + + private final DigestOutputStream out; + private final Writer w; + private final MessageDigest messageDigest; + + private boolean started = false, finished = false; + + BatchWriterImpl(OutputStream out, WriterFactory writerFactory, + MessageDigest messageDigest) { + this.out = new DigestOutputStream(out, messageDigest); + w = writerFactory.createWriter(this.out); + this.messageDigest = messageDigest; + } + + public int getCapacity() { + return Batch.MAX_SIZE - 3; + } + + public boolean addMessage(byte[] message) throws IOException { + if(finished) throw new IllegalStateException(); + if(!started) { + messageDigest.reset(); + w.writeUserDefinedTag(Tags.BATCH); + w.writeListStart(); + started = true; + } + int capacity = Batch.MAX_SIZE - (int) w.getBytesWritten() - 1; + if(capacity < message.length) return false; + // Bypass the writer and write each raw message directly + out.write(message); + return true; + } + + public BatchId finish() throws IOException { + if(finished) throw new IllegalStateException(); + if(!started) { + messageDigest.reset(); + w.writeUserDefinedTag(Tags.BATCH); + w.writeListStart(); + started = true; + } + w.writeListEnd(); + out.flush(); + finished = true; + return new BatchId(messageDigest.digest()); + } +} diff --git a/components/net/sf/briar/protocol/BundleReaderImpl.java b/components/net/sf/briar/protocol/BundleReaderImpl.java deleted file mode 100644 index 684a0b97bd91843cb9d65b70613444efc5939879..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/protocol/BundleReaderImpl.java +++ /dev/null @@ -1,58 +0,0 @@ -package net.sf.briar.protocol; - -import java.io.IOException; -import java.security.GeneralSecurityException; - -import net.sf.briar.api.protocol.Batch; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.Header; -import net.sf.briar.api.protocol.Tags; -import net.sf.briar.api.serial.FormatException; -import net.sf.briar.api.serial.ObjectReader; -import net.sf.briar.api.serial.Reader; - -class BundleReaderImpl implements BundleReader { - - private static enum State { START, BATCHES, END }; - - private final Reader reader; - private final ObjectReader<Header> headerReader; - private final ObjectReader<Batch> batchReader; - private State state = State.START; - - BundleReaderImpl(Reader reader, ObjectReader<Header> headerReader, - ObjectReader<Batch> batchReader) { - this.reader = reader; - this.headerReader = headerReader; - this.batchReader = batchReader; - } - - public Header getHeader() throws IOException, GeneralSecurityException { - if(state != State.START) throw new IllegalStateException(); - reader.addObjectReader(Tags.HEADER, headerReader); - Header h = reader.readUserDefined(Tags.HEADER, Header.class); - reader.removeObjectReader(Tags.HEADER); - // Expect a list of batches - reader.readListStart(); - reader.addObjectReader(Tags.BATCH, batchReader); - state = State.BATCHES; - return h; - } - - public Batch getNextBatch() throws IOException, GeneralSecurityException { - if(state != State.BATCHES) throw new IllegalStateException(); - if(reader.hasListEnd()) { - reader.removeObjectReader(Tags.BATCH); - reader.readListEnd(); - // That should be all - if(!reader.eof()) throw new FormatException(); - state = State.END; - return null; - } - return reader.readUserDefined(Tags.BATCH, Batch.class); - } - - public void finish() throws IOException { - reader.close(); - } -} diff --git a/components/net/sf/briar/protocol/BundleWriterImpl.java b/components/net/sf/briar/protocol/BundleWriterImpl.java deleted file mode 100644 index 10886002bcec89abf917bfe801eb02f0bc983f02..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/protocol/BundleWriterImpl.java +++ /dev/null @@ -1,91 +0,0 @@ -package net.sf.briar.protocol; - -import java.io.IOException; -import java.io.OutputStream; -import java.security.DigestOutputStream; -import java.security.GeneralSecurityException; -import java.security.MessageDigest; -import java.util.Collection; -import java.util.Map; - -import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleWriter; -import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Tags; -import net.sf.briar.api.serial.Raw; -import net.sf.briar.api.serial.Writer; -import net.sf.briar.api.serial.WriterFactory; - -class BundleWriterImpl implements BundleWriter { - - private static enum State { START, FIRST_BATCH, MORE_BATCHES, END }; - - private final DigestOutputStream out; - private final Writer writer; - private final MessageDigest messageDigest; - private final long capacity; - private State state = State.START; - - BundleWriterImpl(OutputStream out, WriterFactory writerFactory, - MessageDigest messageDigest, long capacity) { - this.out = new DigestOutputStream(out, messageDigest); - this.out.on(false); // Turn off the digest until we need it - writer = writerFactory.createWriter(this.out); - this.messageDigest = messageDigest; - this.capacity = capacity; - } - - public long getRemainingCapacity() { - return capacity - writer.getBytesWritten(); - } - - public void addHeader(Collection<BatchId> acks, Collection<GroupId> subs, - Map<String, String> transports) throws IOException, - GeneralSecurityException { - if(state != State.START) throw new IllegalStateException(); - // Write the initial tag - writer.writeUserDefinedTag(Tags.HEADER); - // Write the data - writer.writeList(acks); - writer.writeList(subs); - writer.writeMap(transports); - writer.writeInt64(System.currentTimeMillis()); - // Expect a (possibly empty) list of batches - state = State.FIRST_BATCH; - } - - public BatchId addBatch(Collection<Raw> messages) throws IOException, - GeneralSecurityException { - if(state == State.FIRST_BATCH) { - writer.writeListStart(); - state = State.MORE_BATCHES; - } - if(state != State.MORE_BATCHES) throw new IllegalStateException(); - // Write the initial tag - writer.writeUserDefinedTag(Tags.BATCH); - // Start digesting - messageDigest.reset(); - out.on(true); - // Write the data - writer.writeListStart(); - // Bypass the writer and write each raw message directly - for(Raw message : messages) out.write(message.getBytes()); - writer.writeListEnd(); - // Stop digesting - out.on(false); - // Calculate and return the ID - return new BatchId(messageDigest.digest()); - } - - public void finish() throws IOException { - if(state == State.FIRST_BATCH) { - writer.writeListStart(); - state = State.MORE_BATCHES; - } - if(state != State.MORE_BATCHES) throw new IllegalStateException(); - writer.writeListEnd(); - out.flush(); - out.close(); - state = State.END; - } -} diff --git a/components/net/sf/briar/protocol/HeaderFactory.java b/components/net/sf/briar/protocol/HeaderFactory.java deleted file mode 100644 index 52099b354114638ff56b5c97ed4c2b51eb5bb79a..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/protocol/HeaderFactory.java +++ /dev/null @@ -1,14 +0,0 @@ -package net.sf.briar.protocol; - -import java.util.Collection; -import java.util.Map; - -import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; - -interface HeaderFactory { - - Header createHeader(Collection<BatchId> acks, Collection<GroupId> subs, - Map<String, String> transports, long timestamp); -} diff --git a/components/net/sf/briar/protocol/HeaderFactoryImpl.java b/components/net/sf/briar/protocol/HeaderFactoryImpl.java deleted file mode 100644 index 07a34f7b3049d8df54536723c49e9dc21f58e3e4..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/protocol/HeaderFactoryImpl.java +++ /dev/null @@ -1,21 +0,0 @@ -package net.sf.briar.protocol; - -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; - -class HeaderFactoryImpl implements HeaderFactory { - - public Header createHeader(Collection<BatchId> acks, - Collection<GroupId> subs, Map<String, String> transports, - long timestamp) { - Set<BatchId> ackSet = new HashSet<BatchId>(acks); - Set<GroupId> subSet = new HashSet<GroupId>(subs); - return new HeaderImpl(ackSet, subSet, transports, timestamp); - } -} diff --git a/components/net/sf/briar/protocol/HeaderImpl.java b/components/net/sf/briar/protocol/HeaderImpl.java deleted file mode 100644 index a0bb2ed35c97bc022d10c5c294824fbd22937e96..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/protocol/HeaderImpl.java +++ /dev/null @@ -1,41 +0,0 @@ -package net.sf.briar.protocol; - -import java.util.Map; -import java.util.Set; - -import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; - -/** A simple in-memory implementation of a header. */ -class HeaderImpl implements Header { - - private final Set<BatchId> acks; - private final Set<GroupId> subs; - private final Map<String, String> transports; - private final long timestamp; - - HeaderImpl(Set<BatchId> acks, Set<GroupId> subs, - Map<String, String> transports, long timestamp) { - this.acks = acks; - this.subs = subs; - this.transports = transports; - this.timestamp = timestamp; - } - - public Set<BatchId> getAcks() { - return acks; - } - - public Set<GroupId> getSubscriptions() { - return subs; - } - - public Map<String, String> getTransports() { - return transports; - } - - public long getTimestamp() { - return timestamp; - } -} diff --git a/components/net/sf/briar/protocol/HeaderReader.java b/components/net/sf/briar/protocol/HeaderReader.java deleted file mode 100644 index 6c75d9d73cfe8dc311abc015e38b08c56fb0b96c..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/protocol/HeaderReader.java +++ /dev/null @@ -1,47 +0,0 @@ -package net.sf.briar.protocol; - -import java.io.IOException; -import java.util.Collection; -import java.util.Map; - -import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; -import net.sf.briar.api.protocol.Tags; -import net.sf.briar.api.serial.FormatException; -import net.sf.briar.api.serial.ObjectReader; -import net.sf.briar.api.serial.Reader; - -class HeaderReader implements ObjectReader<Header> { - - private final HeaderFactory headerFactory; - - HeaderReader(HeaderFactory headerFactory) { - this.headerFactory = headerFactory; - } - - public Header readObject(Reader r) throws IOException { - // Initialise and add the consumer - CountingConsumer counting = new CountingConsumer(Header.MAX_SIZE); - r.addConsumer(counting); - r.readUserDefinedTag(Tags.HEADER); - // Acks - r.addObjectReader(Tags.BATCH_ID, new BatchIdReader()); - Collection<BatchId> acks = r.readList(BatchId.class); - r.removeObjectReader(Tags.BATCH_ID); - // Subs - r.addObjectReader(Tags.GROUP_ID, new GroupIdReader()); - Collection<GroupId> subs = r.readList(GroupId.class); - r.removeObjectReader(Tags.GROUP_ID); - // Transports - Map<String, String> transports = - r.readMap(String.class, String.class); - // Timestamp - long timestamp = r.readInt64(); - if(timestamp < 0L) throw new FormatException(); - // Remove the consumer - r.removeConsumer(counting); - // Build and return the header - return headerFactory.createHeader(acks, subs, transports, timestamp); - } -} diff --git a/components/net/sf/briar/protocol/ProtocolModule.java b/components/net/sf/briar/protocol/ProtocolModule.java index bedf1e35065ba76d818092b1880abc634d055970..9652d0bf5a25c547c8ef641b7ad5c621a422cef7 100644 --- a/components/net/sf/briar/protocol/ProtocolModule.java +++ b/components/net/sf/briar/protocol/ProtocolModule.java @@ -1,8 +1,5 @@ package net.sf.briar.protocol; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.BundleWriter; - import com.google.inject.AbstractModule; public class ProtocolModule extends AbstractModule { @@ -10,8 +7,5 @@ public class ProtocolModule extends AbstractModule { @Override protected void configure() { bind(BatchFactory.class).to(BatchFactoryImpl.class); - bind(BundleReader.class).to(BundleReaderImpl.class); - bind(BundleWriter.class).to(BundleWriterImpl.class); - bind(HeaderFactory.class).to(HeaderFactoryImpl.class); } } diff --git a/test/build.xml b/test/build.xml index c39f9726ccf415da582b1f613f6c6303786c405f..f5e38bbba44134980a3beab11549b0c9467be1d5 100644 --- a/test/build.xml +++ b/test/build.xml @@ -20,11 +20,10 @@ <test name='net.sf.briar.i18n.FontManagerTest'/> <test name='net.sf.briar.i18n.I18nTest'/> <test name='net.sf.briar.invitation.InvitationWorkerTest'/> + <test name='net.sf.briar.protocol.AckReaderTest'/> <test name='net.sf.briar.protocol.BatchReaderTest'/> - <test name='net.sf.briar.protocol.BundleReaderImplTest'/> - <test name='net.sf.briar.protocol.BundleReadWriteTest'/> <test name='net.sf.briar.protocol.ConsumersTest'/> - <test name='net.sf.briar.protocol.HeaderReaderTest'/> + <test name='net.sf.briar.protocol.FileReadWriteTest'/> <test name='net.sf.briar.protocol.SigningDigestingOutputStreamTest'/> <test name='net.sf.briar.serial.ReaderImplTest'/> <test name='net.sf.briar.serial.WriterImplTest'/> diff --git a/test/net/sf/briar/db/DatabaseComponentTest.java b/test/net/sf/briar/db/DatabaseComponentTest.java index 0ae68d759a3c1a04e49da2aea2ca77a3e27e0b96..e2aaba9208824e1bfce065c8320fe7284c0e2271 100644 --- a/test/net/sf/briar/db/DatabaseComponentTest.java +++ b/test/net/sf/briar/db/DatabaseComponentTest.java @@ -1,8 +1,9 @@ package net.sf.briar.db; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Map; -import java.util.Set; import junit.framework.TestCase; import net.sf.briar.TestUtils; @@ -12,17 +13,13 @@ import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.Status; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.AckWriter; import net.sf.briar.api.protocol.AuthorId; -import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.BundleWriter; import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; -import net.sf.briar.api.serial.Raw; -import net.sf.briar.api.serial.RawByteArray; import org.jmock.Expectations; import org.jmock.Mockery; @@ -40,11 +37,11 @@ public abstract class DatabaseComponentTest extends TestCase { private final int size; private final byte[] raw; private final Message message; - private final Set<ContactId> contacts; - private final Set<BatchId> acks; - private final Set<GroupId> subs; + private final Collection<ContactId> contacts; + private final Collection<BatchId> acks; + private final Collection<GroupId> subs; private final Map<String, String> transports; - private final Set<MessageId> messages; + private final Collection<MessageId> messages; public DatabaseComponentTest() { super(); @@ -59,11 +56,11 @@ public abstract class DatabaseComponentTest extends TestCase { raw = new byte[size]; message = new TestMessage(messageId, MessageId.NONE, groupId, authorId, timestamp, raw); - contacts = Collections.singleton(contactId); - acks = Collections.singleton(batchId); - subs = Collections.singleton(groupId); + contacts = Collections.singletonList(contactId); + acks = Collections.singletonList(batchId); + subs = Collections.singletonList(groupId); transports = Collections.singletonMap("foo", "bar"); - messages = Collections.singleton(messageId); + messages = Collections.singletonList(messageId); } protected abstract <T> DatabaseComponent createDatabaseComponent( @@ -419,13 +416,13 @@ public abstract class DatabaseComponentTest extends TestCase { } @Test - public void testGenerateBundleThrowsExceptionIfContactIsMissing() + public void testGenerateAckThrowsExceptionIfContactIsMissing() throws Exception { Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); - final BundleWriter bundleBuilder = context.mock(BundleWriter.class); + final AckWriter ackWriter = context.mock(AckWriter.class); context.checking(new Expectations() {{ // Check that the contact is still in the DB oneOf(database).startTransaction(); @@ -437,7 +434,7 @@ public abstract class DatabaseComponentTest extends TestCase { DatabaseComponent db = createDatabaseComponent(database, cleaner); try { - db.generateBundle(contactId, bundleBuilder); + db.generateAck(contactId, ackWriter); assertTrue(false); } catch(NoSuchContactException expected) {} @@ -445,67 +442,49 @@ public abstract class DatabaseComponentTest extends TestCase { } @Test - public void testGenerateBundle() throws Exception { - final long headerSize = 1234L; - final Raw messageRaw = new RawByteArray(raw); + public void testGenerateAck() throws Exception { + final BatchId batchId1 = new BatchId(TestUtils.getRandomId()); + final Collection<BatchId> twoAcks = new ArrayList<BatchId>(); + twoAcks.add(batchId); + twoAcks.add(batchId1); Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); - final BundleWriter bundleWriter = context.mock(BundleWriter.class); + final AckWriter ackWriter = context.mock(AckWriter.class); context.checking(new Expectations() {{ allowing(database).startTransaction(); will(returnValue(txn)); allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - // Add acks to the header - oneOf(database).removeBatchesToAck(txn, contactId); - will(returnValue(acks)); - // Add subscriptions to the header - oneOf(database).getSubscriptions(txn); - will(returnValue(subs)); - // Add transports to the header - oneOf(database).getTransports(txn); - will(returnValue(transports)); - // Build the header - oneOf(bundleWriter).addHeader(acks, subs, transports); - // Add a batch to the bundle - oneOf(bundleWriter).getRemainingCapacity(); - will(returnValue(1024L * 1024L - headerSize)); - oneOf(database).getSendableMessages(txn, contactId, - Batch.MAX_SIZE - headerSize); - will(returnValue(messages)); - oneOf(database).getMessage(txn, messageId); - will(returnValue(raw)); - // Add the batch to the bundle - oneOf(bundleWriter).addBatch(Collections.singletonList(messageRaw)); - will(returnValue(batchId)); - // Record the outstanding batch - oneOf(database).addOutstandingBatch( - txn, contactId, batchId, messages); - // Send the bundle - oneOf(bundleWriter).finish(); + // Get the batches to ack + oneOf(database).getBatchesToAck(txn, contactId); + will(returnValue(twoAcks)); + // Try to add both batches to the writer - only manage to add one + oneOf(ackWriter).addBatchId(batchId); + will(returnValue(true)); + oneOf(ackWriter).addBatchId(batchId1); + will(returnValue(false)); + // Record the batch that was acked + oneOf(database).removeBatchesToAck(txn, contactId, acks); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner); - db.generateBundle(contactId, bundleWriter); + db.generateAck(contactId, ackWriter); context.assertIsSatisfied(); } @Test - public void testReceiveBundleThrowsExceptionIfContactIsMissing() + public void testReceiveAckThrowsExceptionIfContactIsMissing() throws Exception { Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); - final BundleReader bundleReader = context.mock(BundleReader.class); - final Header header = context.mock(Header.class); + final Ack ack = context.mock(Ack.class); context.checking(new Expectations() {{ - oneOf(bundleReader).getHeader(); - will(returnValue(header)); // Check that the contact is still in the DB oneOf(database).startTransaction(); will(returnValue(txn)); @@ -516,7 +495,7 @@ public abstract class DatabaseComponentTest extends TestCase { DatabaseComponent db = createDatabaseComponent(database, cleaner); try { - db.receiveBundle(contactId, bundleReader); + db.receiveAck(contactId, ack); assertTrue(false); } catch(NoSuchContactException expected) {} @@ -524,66 +503,26 @@ public abstract class DatabaseComponentTest extends TestCase { } @Test - public void testReceiveBundle() throws Exception { + public void testReceiveAck() throws Exception { Mockery context = new Mockery(); @SuppressWarnings("unchecked") final Database<Object> database = context.mock(Database.class); final DatabaseCleaner cleaner = context.mock(DatabaseCleaner.class); - final BundleReader bundleReader = context.mock(BundleReader.class); - final Header header = context.mock(Header.class); - final Batch batch = context.mock(Batch.class); + final Ack ack = context.mock(Ack.class); context.checking(new Expectations() {{ allowing(database).startTransaction(); will(returnValue(txn)); allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - // Header - oneOf(bundleReader).getHeader(); - will(returnValue(header)); - // Acks - oneOf(header).getAcks(); + // Get the acked batches + oneOf(ack).getBatches(); will(returnValue(acks)); oneOf(database).removeAckedBatch(txn, contactId, batchId); - // Subscriptions - oneOf(header).getSubscriptions(); - will(returnValue(subs)); - oneOf(header).getTimestamp(); - will(returnValue(timestamp)); - oneOf(database).setSubscriptions(txn, contactId, subs, timestamp); - // Transports - oneOf(header).getTransports(); - will(returnValue(transports)); - oneOf(header).getTimestamp(); - will(returnValue(timestamp)); - oneOf(database).setTransports(txn, contactId, transports, - timestamp); - // Batches - oneOf(bundleReader).getNextBatch(); - will(returnValue(batch)); - oneOf(batch).getMessages(); - will(returnValue(Collections.singleton(message))); - oneOf(database).containsSubscription(txn, groupId); - will(returnValue(true)); - oneOf(database).addMessage(txn, message); - will(returnValue(false)); // Duplicate message - oneOf(database).setStatus(txn, contactId, messageId, Status.SEEN); - // Batch to ack - oneOf(batch).getId(); - will(returnValue(batchId)); - oneOf(database).addBatchToAck(txn, contactId, batchId); - // Any more batches? Nope - oneOf(bundleReader).getNextBatch(); - will(returnValue(null)); - oneOf(bundleReader).finish(); - // Lost batches - oneOf(database).getLostBatches(txn, contactId); - will(returnValue(Collections.singleton(batchId))); - oneOf(database).removeLostBatch(txn, contactId, batchId); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner); - db.receiveBundle(contactId, bundleReader); + db.receiveAck(contactId, ack); context.assertIsSatisfied(); } diff --git a/test/net/sf/briar/db/H2DatabaseTest.java b/test/net/sf/briar/db/H2DatabaseTest.java index 80d46876658cc54bf10dcf716b28201153a7b6cd..782c92b21f66f557399c58516a900358b58b6719 100644 --- a/test/net/sf/briar/db/H2DatabaseTest.java +++ b/test/net/sf/briar/db/H2DatabaseTest.java @@ -3,12 +3,12 @@ package net.sf.briar.db; import java.io.File; import java.sql.Connection; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -73,9 +73,8 @@ public class H2DatabaseTest extends TestCase { @Test public void testPersistence() throws DbException { - Database<Connection> db = open(false); - // Store some records + Database<Connection> db = open(false); Connection txn = db.startTransaction(); assertFalse(db.containsContact(txn, contactId)); Map<String, String> transports = Collections.singletonMap("foo", "bar"); @@ -90,9 +89,8 @@ public class H2DatabaseTest extends TestCase { db.commitTransaction(txn); db.close(); - // Reopen the database - db = open(true); // Check that the records are still there + db = open(true); txn = db.startTransaction(); assertTrue(db.containsContact(txn, contactId)); transports = db.getTransports(txn, contactId); @@ -108,9 +106,8 @@ public class H2DatabaseTest extends TestCase { db.commitTransaction(txn); db.close(); - // Repoen the database - db = open(true); // Check that the records are gone + db = open(true); txn = db.startTransaction(); assertFalse(db.containsContact(txn, contactId)); assertEquals(Collections.emptyMap(), db.getTransports(txn, contactId)); @@ -126,9 +123,9 @@ public class H2DatabaseTest extends TestCase { ContactId contactId2 = new ContactId(3); ContactId contactId3 = new ContactId(4); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Create three contacts - Connection txn = db.startTransaction(); assertFalse(db.containsContact(txn, contactId)); assertEquals(contactId, db.addContact(txn, null)); assertTrue(db.containsContact(txn, contactId)); @@ -145,199 +142,171 @@ public class H2DatabaseTest extends TestCase { assertFalse(db.containsContact(txn, contactId3)); assertEquals(contactId3, db.addContact(txn, null)); assertTrue(db.containsContact(txn, contactId3)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testRatings() throws DbException { Database<Connection> db = open(false); - Connection txn = db.startTransaction(); + // Unknown authors should be unrated assertEquals(Rating.UNRATED, db.getRating(txn, authorId)); // Store a rating db.setRating(txn, authorId, Rating.GOOD); - db.commitTransaction(txn); // Check that the rating was stored - txn = db.startTransaction(); assertEquals(Rating.GOOD, db.getRating(txn, authorId)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testUnsubscribingRemovesMessage() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Subscribe to a group and store a message - Connection txn = db.startTransaction(); db.addSubscription(txn, groupId); db.addMessage(txn, message); - db.commitTransaction(txn); // Unsubscribing from the group should delete the message - txn = db.startTransaction(); assertTrue(db.containsMessage(txn, messageId)); db.removeSubscription(txn, groupId); assertFalse(db.containsMessage(txn, messageId)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testSendableMessagesMustBeSendable() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact, subscribe to a group and store a message - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setStatus(txn, contactId, messageId, Status.NEW); - db.commitTransaction(txn); // The message should not be sendable - txn = db.startTransaction(); assertEquals(0, db.getSendability(txn, messageId)); Iterator<MessageId> it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); // Changing the sendability to > 0 should make the message sendable - txn = db.startTransaction(); db.setSendability(txn, messageId, 1); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); - db.commitTransaction(txn); // Changing the sendability to 0 should make the message unsendable - txn = db.startTransaction(); db.setSendability(txn, messageId, 0); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testSendableMessagesMustBeNew() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact, subscribe to a group and store a message - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); - db.commitTransaction(txn); // The message has no status yet, so it should not be sendable - txn = db.startTransaction(); Iterator<MessageId> it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); // Changing the status to Status.NEW should make the message sendable - txn = db.startTransaction(); db.setStatus(txn, contactId, messageId, Status.NEW); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); - db.commitTransaction(txn); // Changing the status to SENT should make the message unsendable - txn = db.startTransaction(); db.setStatus(txn, contactId, messageId, Status.SENT); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); // Changing the status to SEEN should also make the message unsendable - txn = db.startTransaction(); db.setStatus(txn, contactId, messageId, Status.SEEN); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testSendableMessagesMustBeSubscribed() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact, subscribe to a group and store a message - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); - db.commitTransaction(txn); // The contact is not subscribed, so the message should not be sendable - txn = db.startTransaction(); Iterator<MessageId> it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); // The contact subscribing should make the message sendable - txn = db.startTransaction(); db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); - db.commitTransaction(txn); // The contact unsubscribing should make the message unsendable - txn = db.startTransaction(); db.setSubscriptions(txn, contactId, Collections.<GroupId>emptySet(), 2); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testSendableMessagesMustFitCapacity() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact, subscribe to a group and store a message - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); - db.commitTransaction(txn); // The message is too large to send - txn = db.startTransaction(); Iterator<MessageId> it = db.getSendableMessages(txn, contactId, size - 1).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); // The message is just the right size to send - txn = db.startTransaction(); it = db.getSendableMessages(txn, contactId, size).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -345,47 +314,45 @@ public class H2DatabaseTest extends TestCase { public void testBatchesToAck() throws DbException { BatchId batchId1 = new BatchId(TestUtils.getRandomId()); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact and some batches to ack - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addBatchToAck(txn, contactId, batchId); db.addBatchToAck(txn, contactId, batchId1); db.commitTransaction(txn); // Both batch IDs should be returned - txn = db.startTransaction(); - Set<BatchId> acks = db.removeBatchesToAck(txn, contactId); + Collection<BatchId> acks = db.getBatchesToAck(txn, contactId); assertEquals(2, acks.size()); assertTrue(acks.contains(batchId)); assertTrue(acks.contains(batchId1)); - db.commitTransaction(txn); + + // Remove the batch IDs + db.removeBatchesToAck(txn, contactId, acks); // Both batch IDs should have been removed - txn = db.startTransaction(); - acks = db.removeBatchesToAck(txn, contactId); + acks = db.getBatchesToAck(txn, contactId); assertEquals(0, acks.size()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testRemoveAckedBatch() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact, subscribe to a group and store a message - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); - db.commitTransaction(txn); // Get the message and mark it as sent - txn = db.startTransaction(); Iterator<MessageId> it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); @@ -394,10 +361,8 @@ public class H2DatabaseTest extends TestCase { db.setStatus(txn, contactId, messageId, Status.SENT); db.addOutstandingBatch(txn, contactId, batchId, Collections.singleton(messageId)); - db.commitTransaction(txn); // The message should no longer be sendable - txn = db.startTransaction(); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); // Pretend that the batch was acked @@ -405,27 +370,25 @@ public class H2DatabaseTest extends TestCase { // The message still should not be sendable it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testRemoveLostBatch() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact, subscribe to a group and store a message - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); db.addSubscription(txn, groupId); db.setSubscriptions(txn, contactId, Collections.singleton(groupId), 1); db.addMessage(txn, message); db.setSendability(txn, messageId, 1); db.setStatus(txn, contactId, messageId, Status.NEW); - db.commitTransaction(txn); // Get the message and mark it as sent - txn = db.startTransaction(); Iterator<MessageId> it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertTrue(it.hasNext()); @@ -434,10 +397,8 @@ public class H2DatabaseTest extends TestCase { db.setStatus(txn, contactId, messageId, Status.SENT); db.addOutstandingBatch(txn, contactId, batchId, Collections.singleton(messageId)); - db.commitTransaction(txn); // The message should no longer be sendable - txn = db.startTransaction(); it = db.getSendableMessages(txn, contactId, ONE_MEGABYTE).iterator(); assertFalse(it.hasNext()); // Pretend that the batch was lost @@ -447,8 +408,8 @@ public class H2DatabaseTest extends TestCase { assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -458,15 +419,15 @@ public class H2DatabaseTest extends TestCase { for(int i = 0; i < ids.length; i++) { ids[i] = new BatchId(TestUtils.getRandomId()); } - Set<MessageId> empty = Collections.emptySet(); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); // Add some outstanding batches, a few ms apart for(int i = 0; i < ids.length; i++) { - db.addOutstandingBatch(txn, contactId, ids[i], empty); + db.addOutstandingBatch(txn, contactId, ids[i], + Collections.<MessageId>emptySet()); try { Thread.sleep(5); } catch(InterruptedException ignored) {} @@ -475,19 +436,19 @@ public class H2DatabaseTest extends TestCase { // RETRANSMIT_THRESHOLD - 1 acks should not trigger any retransmissions for(int i = 0; i < Database.RETRANSMIT_THRESHOLD - 1; i++) { db.removeAckedBatch(txn, contactId, ids[ids.length - i - 1]); - Set<BatchId> lost = db.getLostBatches(txn, contactId); - assertEquals(Collections.emptySet(), lost); + Collection<BatchId> lost = db.getLostBatches(txn, contactId); + assertEquals(Collections.emptyList(), lost); } // The next ack should trigger the retransmission of the remaining // five outstanding batches int index = ids.length - Database.RETRANSMIT_THRESHOLD; db.removeAckedBatch(txn, contactId, ids[index]); - Set<BatchId> lost = db.getLostBatches(txn, contactId); + Collection<BatchId> lost = db.getLostBatches(txn, contactId); for(int i = 0; i < index; i++) { assertTrue(lost.contains(ids[i])); } - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -497,15 +458,15 @@ public class H2DatabaseTest extends TestCase { for(int i = 0; i < ids.length; i++) { ids[i] = new BatchId(TestUtils.getRandomId()); } - Set<MessageId> empty = Collections.emptySet(); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact - Connection txn = db.startTransaction(); assertEquals(contactId, db.addContact(txn, null)); // Add some outstanding batches, a few ms apart for(int i = 0; i < ids.length; i++) { - db.addOutstandingBatch(txn, contactId, ids[i], empty); + db.addOutstandingBatch(txn, contactId, ids[i], + Collections.<MessageId>emptySet()); try { Thread.sleep(5); } catch(InterruptedException ignored) {} @@ -514,11 +475,11 @@ public class H2DatabaseTest extends TestCase { // should be retransmitted for(int i = 0; i < ids.length; i++) { db.removeAckedBatch(txn, contactId, ids[i]); - Set<BatchId> lost = db.getLostBatches(txn, contactId); - assertEquals(Collections.emptySet(), lost); + Collection<BatchId> lost = db.getLostBatches(txn, contactId); + assertEquals(Collections.emptyList(), lost); } - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -529,16 +490,14 @@ public class H2DatabaseTest extends TestCase { Message message1 = new TestMessage(messageId1, MessageId.NONE, groupId, authorId1, timestamp, raw); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Subscribe to a group and store two messages - Connection txn = db.startTransaction(); db.addSubscription(txn, groupId); db.addMessage(txn, message); db.addMessage(txn, message1); - db.commitTransaction(txn); // Check that each message is retrievable via its author - txn = db.startTransaction(); Iterator<MessageId> it = db.getMessagesByAuthor(txn, authorId).iterator(); assertTrue(it.hasNext()); @@ -548,8 +507,8 @@ public class H2DatabaseTest extends TestCase { assertTrue(it.hasNext()); assertEquals(messageId1, it.next()); assertFalse(it.hasNext()); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -567,9 +526,9 @@ public class H2DatabaseTest extends TestCase { Message child3 = new TestMessage(childId3, messageId, groupId1, authorId, timestamp, raw); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Subscribe to the groups and store the messages - Connection txn = db.startTransaction(); db.addSubscription(txn, groupId); db.addSubscription(txn, groupId1); db.addMessage(txn, message); @@ -580,17 +539,15 @@ public class H2DatabaseTest extends TestCase { db.setSendability(txn, childId1, 1); db.setSendability(txn, childId2, 5); db.setSendability(txn, childId3, 3); - db.commitTransaction(txn); // There should be two sendable children - txn = db.startTransaction(); assertEquals(2, db.getNumberOfSendableChildren(txn, messageId)); // Make one of the children unsendable db.setSendability(txn, childId1, 0); // Now there should be one sendable child assertEquals(1, db.getNumberOfSendableChildren(txn, messageId)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -600,31 +557,27 @@ public class H2DatabaseTest extends TestCase { Message message1 = new TestMessage(messageId1, MessageId.NONE, groupId, authorId, timestamp + 1000, raw); Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Subscribe to a group and store two messages - Connection txn = db.startTransaction(); db.addSubscription(txn, groupId); db.addMessage(txn, message); db.addMessage(txn, message1); - db.commitTransaction(txn); // Allowing enough capacity for one message should return the older one - txn = db.startTransaction(); Iterator<MessageId> it = db.getOldMessages(txn, size).iterator(); assertTrue(it.hasNext()); assertEquals(messageId, it.next()); assertFalse(it.hasNext()); - db.commitTransaction(txn); // Allowing enough capacity for both messages should return both - txn = db.startTransaction(); - Set<MessageId> ids = new HashSet<MessageId>(); + Collection<MessageId> ids = new HashSet<MessageId>(); for(MessageId id : db.getOldMessages(txn, size * 2)) ids.add(id); assertEquals(2, ids.size()); assertTrue(ids.contains(messageId)); assertTrue(ids.contains(messageId1)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @@ -732,9 +685,9 @@ public class H2DatabaseTest extends TestCase { @Test public void testUpdateTransports() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact with some transport details - Connection txn = db.startTransaction(); Map<String, String> transports = Collections.singletonMap("foo", "bar"); assertEquals(contactId, db.addContact(txn, transports)); assertEquals(transports, db.getTransports(txn, contactId)); @@ -753,17 +706,17 @@ public class H2DatabaseTest extends TestCase { // Remove the local transport details db.setTransports(txn, null); assertEquals(Collections.emptyMap(), db.getTransports(txn)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testTransportsNotUpdatedIfTimestampIsOld() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact with some transport details - Connection txn = db.startTransaction(); Map<String, String> transports = Collections.singletonMap("foo", "bar"); assertEquals(contactId, db.addContact(txn, transports)); assertEquals(transports, db.getTransports(txn, contactId)); @@ -780,33 +733,35 @@ public class H2DatabaseTest extends TestCase { db.setTransports(txn, contactId, transports2, 1); // The old transports should still be there assertEquals(transports1, db.getTransports(txn, contactId)); - db.commitTransaction(txn); + db.commitTransaction(txn); db.close(); } @Test public void testUpdateSubscriptions() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact - Connection txn = db.startTransaction(); Map<String, String> transports = Collections.emptyMap(); assertEquals(contactId, db.addContact(txn, transports)); // Add some subscriptions - Set<GroupId> subs = new HashSet<GroupId>(); + Collection<GroupId> subs = new HashSet<GroupId>(); subs.add(new GroupId(TestUtils.getRandomId())); subs.add(new GroupId(TestUtils.getRandomId())); db.setSubscriptions(txn, contactId, subs, 1); - assertEquals(subs, db.getSubscriptions(txn, contactId)); + assertEquals(subs, + new HashSet<GroupId>(db.getSubscriptions(txn, contactId))); // Update the subscriptions - Set<GroupId> subs1 = new HashSet<GroupId>(); + Collection<GroupId> subs1 = new HashSet<GroupId>(); subs1.add(new GroupId(TestUtils.getRandomId())); subs1.add(new GroupId(TestUtils.getRandomId())); db.setSubscriptions(txn, contactId, subs1, 2); - assertEquals(subs1, db.getSubscriptions(txn, contactId)); - db.commitTransaction(txn); + assertEquals(subs1, + new HashSet<GroupId>(db.getSubscriptions(txn, contactId))); + db.commitTransaction(txn); db.close(); } @@ -814,26 +769,28 @@ public class H2DatabaseTest extends TestCase { public void testSubscriptionsNotUpdatedIfTimestampIsOld() throws DbException { Database<Connection> db = open(false); + Connection txn = db.startTransaction(); // Add a contact - Connection txn = db.startTransaction(); Map<String, String> transports = Collections.emptyMap(); assertEquals(contactId, db.addContact(txn, transports)); // Add some subscriptions - Set<GroupId> subs = new HashSet<GroupId>(); + Collection<GroupId> subs = new HashSet<GroupId>(); subs.add(new GroupId(TestUtils.getRandomId())); subs.add(new GroupId(TestUtils.getRandomId())); db.setSubscriptions(txn, contactId, subs, 2); - assertEquals(subs, db.getSubscriptions(txn, contactId)); + assertEquals(subs, + new HashSet<GroupId>(db.getSubscriptions(txn, contactId))); // Try to update the subscriptions using a timestamp of 1 - Set<GroupId> subs1 = new HashSet<GroupId>(); + Collection<GroupId> subs1 = new HashSet<GroupId>(); subs1.add(new GroupId(TestUtils.getRandomId())); subs1.add(new GroupId(TestUtils.getRandomId())); db.setSubscriptions(txn, contactId, subs1, 1); // The old subscriptions should still be there - assertEquals(subs, db.getSubscriptions(txn, contactId)); - db.commitTransaction(txn); + assertEquals(subs, + new HashSet<GroupId>(db.getSubscriptions(txn, contactId))); + db.commitTransaction(txn); db.close(); } diff --git a/test/net/sf/briar/protocol/AckReaderTest.java b/test/net/sf/briar/protocol/AckReaderTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e3c7259c058899bdc988cad76d2133ed263ab570 --- /dev/null +++ b/test/net/sf/briar/protocol/AckReaderTest.java @@ -0,0 +1,129 @@ +package net.sf.briar.protocol; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.Random; + +import junit.framework.TestCase; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.BatchId; +import net.sf.briar.api.protocol.Tags; +import net.sf.briar.api.protocol.UniqueId; +import net.sf.briar.api.serial.FormatException; +import net.sf.briar.api.serial.Reader; +import net.sf.briar.api.serial.ReaderFactory; +import net.sf.briar.api.serial.Writer; +import net.sf.briar.api.serial.WriterFactory; +import net.sf.briar.serial.SerialModule; + +import org.jmock.Expectations; +import org.jmock.Mockery; +import org.junit.Test; + +import com.google.inject.Guice; +import com.google.inject.Injector; + +public class AckReaderTest extends TestCase { + + private final ReaderFactory readerFactory; + private final WriterFactory writerFactory; + private final Mockery context; + + public AckReaderTest() throws Exception { + super(); + Injector i = Guice.createInjector(new SerialModule()); + readerFactory = i.getInstance(ReaderFactory.class); + writerFactory = i.getInstance(WriterFactory.class); + context = new Mockery(); + } + + @Test + public void testFormatExceptionIfAckIsTooLarge() throws Exception { + AckFactory ackFactory = context.mock(AckFactory.class); + AckReader ackReader = new AckReader(ackFactory); + + byte[] b = createAck(true); + ByteArrayInputStream in = new ByteArrayInputStream(b); + Reader reader = readerFactory.createReader(in); + reader.addObjectReader(Tags.ACK, ackReader); + + try { + reader.readUserDefined(Tags.ACK, Ack.class); + assertTrue(false); + } catch(FormatException expected) {} + context.assertIsSatisfied(); + } + + @Test + @SuppressWarnings("unchecked") + public void testNoFormatExceptionIfAckIsMaximumSize() throws Exception { + final AckFactory ackFactory = context.mock(AckFactory.class); + AckReader ackReader = new AckReader(ackFactory); + final Ack ack = context.mock(Ack.class); + context.checking(new Expectations() {{ + oneOf(ackFactory).createAck(with(any(Collection.class))); + will(returnValue(ack)); + }}); + + byte[] b = createAck(false); + ByteArrayInputStream in = new ByteArrayInputStream(b); + Reader reader = readerFactory.createReader(in); + reader.addObjectReader(Tags.ACK, ackReader); + + assertEquals(ack, reader.readUserDefined(Tags.ACK, Ack.class)); + context.assertIsSatisfied(); + } + + @Test + public void testEmptyAck() throws Exception { + final AckFactory ackFactory = context.mock(AckFactory.class); + AckReader ackReader = new AckReader(ackFactory); + final Ack ack = context.mock(Ack.class); + context.checking(new Expectations() {{ + oneOf(ackFactory).createAck( + with(Collections.<BatchId>emptyList())); + will(returnValue(ack)); + }}); + + byte[] b = createEmptyAck(); + ByteArrayInputStream in = new ByteArrayInputStream(b); + Reader reader = readerFactory.createReader(in); + reader.addObjectReader(Tags.ACK, ackReader); + + assertEquals(ack, reader.readUserDefined(Tags.ACK, Ack.class)); + context.assertIsSatisfied(); + } + + private byte[] createAck(boolean tooBig) throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(Ack.MAX_SIZE); + Writer w = writerFactory.createWriter(out); + w.writeUserDefinedTag(Tags.ACK); + w.writeListStart(); + byte[] b = new byte[UniqueId.LENGTH]; + Random random = new Random(); + while(out.size() < Ack.MAX_SIZE - BatchId.SERIALISED_LENGTH) { + w.writeUserDefinedTag(Tags.BATCH_ID); + random.nextBytes(b); + w.writeRaw(b); + } + if(tooBig) { + w.writeUserDefinedTag(Tags.BATCH_ID); + random.nextBytes(b); + w.writeRaw(b); + } + w.writeListEnd(); + assertEquals(tooBig, out.size() > Ack.MAX_SIZE); + return out.toByteArray(); + } + + private byte[] createEmptyAck() throws Exception { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Writer w = writerFactory.createWriter(out); + w.writeUserDefinedTag(Tags.ACK); + w.writeListStart(); + w.writeListEnd(); + return out.toByteArray(); + } +} diff --git a/test/net/sf/briar/protocol/BundleReaderImplTest.java b/test/net/sf/briar/protocol/BundleReaderImplTest.java deleted file mode 100644 index 794adc310c44c94883b7fe14d8c44eab6ec1ee44..0000000000000000000000000000000000000000 --- a/test/net/sf/briar/protocol/BundleReaderImplTest.java +++ /dev/null @@ -1,233 +0,0 @@ -package net.sf.briar.protocol; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Collections; - -import junit.framework.TestCase; -import net.sf.briar.api.protocol.Batch; -import net.sf.briar.api.protocol.Header; -import net.sf.briar.api.protocol.Tags; -import net.sf.briar.api.serial.FormatException; -import net.sf.briar.api.serial.ObjectReader; -import net.sf.briar.api.serial.Reader; -import net.sf.briar.api.serial.ReaderFactory; -import net.sf.briar.api.serial.Writer; -import net.sf.briar.api.serial.WriterFactory; -import net.sf.briar.serial.SerialModule; - -import org.jmock.Mockery; -import org.junit.Test; - -import com.google.inject.Guice; -import com.google.inject.Injector; - -public class BundleReaderImplTest extends TestCase { - - private final Mockery context = new Mockery(); - private final ReaderFactory readerFactory; - private final WriterFactory writerFactory; - - public BundleReaderImplTest() { - super(); - Injector i = Guice.createInjector(new SerialModule()); - readerFactory = i.getInstance(ReaderFactory.class); - writerFactory = i.getInstance(WriterFactory.class); - } - - @Test - public void testEmptyBundleThrowsFormatException() throws Exception { - ByteArrayInputStream in = new ByteArrayInputStream(new byte[] {}); - Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - try { - b.getHeader(); - assertTrue(false); - } catch(FormatException expected) {} - } - - @Test - public void testReadingBatchBeforeHeaderThrowsIllegalStateException() - throws Exception { - ByteArrayInputStream in = new ByteArrayInputStream(createValidBundle()); - Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - try { - b.getNextBatch(); - assertTrue(false); - } catch(IllegalStateException expected) {} - } - - @Test - public void testMissingHeaderThrowsFormatException() throws Exception { - // Create a headless bundle - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Writer w = writerFactory.createWriter(out); - w.writeListStart(); - w.writeUserDefinedTag(Tags.BATCH); - w.writeList(Collections.emptyList()); - w.writeListEnd(); - byte[] headless = out.toByteArray(); - // Try to read a header from the headless bundle - ByteArrayInputStream in = new ByteArrayInputStream(headless); - Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - try { - b.getHeader(); - assertTrue(false); - } catch(FormatException expected) {} - } - - @Test - public void testMissingBatchListThrowsFormatException() throws Exception { - // Create a header-only bundle - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Writer w = writerFactory.createWriter(out); - w.writeUserDefinedTag(Tags.HEADER); - w.writeList(Collections.emptyList()); // Acks - w.writeList(Collections.emptyList()); // Subs - w.writeMap(Collections.emptyMap()); // Transports - w.writeInt64(System.currentTimeMillis()); // Timestamp - byte[] headerOnly = out.toByteArray(); - // Try to read a header from the header-only bundle - ByteArrayInputStream in = new ByteArrayInputStream(headerOnly); - final Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - try { - b.getHeader(); - assertTrue(false); - } catch(FormatException expected) {} - } - - @Test - public void testEmptyBatchListIsAcceptable() throws Exception { - // Create a bundle with no batches - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Writer w = writerFactory.createWriter(out); - w.writeUserDefinedTag(Tags.HEADER); - w.writeList(Collections.emptyList()); // Acks - w.writeList(Collections.emptyList()); // Subs - w.writeMap(Collections.emptyMap()); // Transports - w.writeInt64(System.currentTimeMillis()); // Timestamp - w.writeListStart(); - w.writeListEnd(); - byte[] batchless = out.toByteArray(); - // It should be possible to read the header and null - ByteArrayInputStream in = new ByteArrayInputStream(batchless); - final Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - assertNotNull(b.getHeader()); - assertNull(b.getNextBatch()); - } - - @Test - public void testValidBundle() throws Exception { - // It should be possible to read the header, a batch, and null - ByteArrayInputStream in = new ByteArrayInputStream(createValidBundle()); - final Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - assertNotNull(b.getHeader()); - assertNotNull(b.getNextBatch()); - assertNull(b.getNextBatch()); - } - - @Test - public void testReadingBatchAfterNullThrowsIllegalStateException() - throws Exception { - // Trying to read another batch after null should not succeed - ByteArrayInputStream in = new ByteArrayInputStream(createValidBundle()); - final Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - assertNotNull(b.getHeader()); - assertNotNull(b.getNextBatch()); - assertNull(b.getNextBatch()); - try { - b.getNextBatch(); - assertTrue(false); - } catch(IllegalStateException expected) {} - } - - @Test - public void testReadingHeaderTwiceThrowsIllegalStateException() - throws Exception { - // Trying to read the header twice should not succeed - ByteArrayInputStream in = new ByteArrayInputStream(createValidBundle()); - final Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - assertNotNull(b.getHeader()); - try { - b.getHeader(); - assertTrue(false); - } catch(IllegalStateException expected) {} - } - - @Test - public void testReadingHeaderAfterBatchThrowsIllegalStateException() - throws Exception { - // Trying to read the header after a batch should not succeed - ByteArrayInputStream in = new ByteArrayInputStream(createValidBundle()); - final Reader r = readerFactory.createReader(in); - BundleReaderImpl b = new BundleReaderImpl(r, new TestHeaderReader(), - new TestBatchReader()); - - assertNotNull(b.getHeader()); - assertNotNull(b.getNextBatch()); - try { - b.getHeader(); - assertTrue(false); - } catch(IllegalStateException expected) {} - } - - private byte[] createValidBundle() throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Writer w = writerFactory.createWriter(out); - w.writeUserDefinedTag(Tags.HEADER); - w.writeList(Collections.emptyList()); // Acks - w.writeList(Collections.emptyList()); // Subs - w.writeMap(Collections.emptyMap()); // Transports - w.writeInt64(System.currentTimeMillis()); // Timestamp - w.writeListStart(); - w.writeUserDefinedTag(Tags.BATCH); - w.writeList(Collections.emptyList()); // Messages - w.writeListEnd(); - return out.toByteArray(); - } - - private class TestHeaderReader implements ObjectReader<Header> { - - public Header readObject(Reader r) throws IOException { - r.readUserDefinedTag(Tags.HEADER); - r.readList(); - r.readList(); - r.readMap(); - r.readInt64(); - return context.mock(Header.class); - } - } - - private class TestBatchReader implements ObjectReader<Batch> { - - public Batch readObject(Reader r) throws IOException { - r.readUserDefinedTag(Tags.BATCH); - r.readList(); - return context.mock(Batch.class); - } - } -} diff --git a/test/net/sf/briar/protocol/BundleReadWriteTest.java b/test/net/sf/briar/protocol/FileReadWriteTest.java similarity index 71% rename from test/net/sf/briar/protocol/BundleReadWriteTest.java rename to test/net/sf/briar/protocol/FileReadWriteTest.java index 1a32d6e73a19dc7cec33fd35460944eed5228def..a4a97cd9dbdfd7ed10fa286a70ce57929a4b29c3 100644 --- a/test/net/sf/briar/protocol/BundleReadWriteTest.java +++ b/test/net/sf/briar/protocol/FileReadWriteTest.java @@ -15,24 +15,21 @@ import java.security.spec.X509EncodedKeySpec; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; -import java.util.Map; -import java.util.Set; import junit.framework.TestCase; import net.sf.briar.TestUtils; import net.sf.briar.api.crypto.KeyParser; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.AckWriter; import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.BundleReader; -import net.sf.briar.api.protocol.BundleWriter; +import net.sf.briar.api.protocol.BatchWriter; import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageEncoder; import net.sf.briar.api.protocol.MessageId; +import net.sf.briar.api.protocol.Tags; import net.sf.briar.api.protocol.UniqueId; -import net.sf.briar.api.serial.Raw; -import net.sf.briar.api.serial.RawByteArray; import net.sf.briar.api.serial.Reader; import net.sf.briar.api.serial.ReaderFactory; import net.sf.briar.api.serial.WriterFactory; @@ -45,22 +42,17 @@ import org.junit.Test; import com.google.inject.Guice; import com.google.inject.Injector; -public class BundleReadWriteTest extends TestCase { +public class FileReadWriteTest extends TestCase { private static final String SIGNATURE_ALGO = "SHA256withRSA"; private static final String KEY_PAIR_ALGO = "RSA"; private static final String DIGEST_ALGO = "SHA-256"; private final File testDir = TestUtils.getTestDirectory(); - private final File bundle = new File(testDir, "bundle"); + private final File file = new File(testDir, "foo"); - private final long capacity = 1024L; private final BatchId ack = new BatchId(TestUtils.getRandomId()); - private final Set<BatchId> acks = Collections.singleton(ack); private final GroupId sub = new GroupId(TestUtils.getRandomId()); - private final Set<GroupId> subs = Collections.singleton(sub); - private final Map<String, String> transports = - Collections.singletonMap("foo", "bar"); private final String nick = "Foo Bar"; private final String messageBody = "This is the message body! Wooooooo!"; @@ -71,7 +63,7 @@ public class BundleReadWriteTest extends TestCase { private final KeyParser keyParser; private final Message message; - public BundleReadWriteTest() throws Exception { + public FileReadWriteTest() throws Exception { super(); // Inject the reader and writer factories, since they belong to // a different component @@ -106,40 +98,43 @@ public class BundleReadWriteTest extends TestCase { } @Test - public void testWriteBundle() throws Exception { - FileOutputStream out = new FileOutputStream(bundle); - BundleWriter w = new BundleWriterImpl(out, writerFactory, batchDigest, - capacity); - Raw messageRaw = new RawByteArray(message.getBytes()); - - w.addHeader(acks, subs, transports); - w.addBatch(Collections.singleton(messageRaw)); - w.finish(); - - assertTrue(bundle.exists()); - assertTrue(bundle.length() > message.getSize()); + public void testWriteFile() throws Exception { + FileOutputStream out = new FileOutputStream(file); + + AckWriter a = new AckWriterImpl(out, writerFactory); + a.addBatchId(ack); + a.finish(); + + BatchWriter b = new BatchWriterImpl(out, writerFactory, batchDigest); + b.addMessage(message.getBytes()); + b.finish(); + + out.close(); + assertTrue(file.exists()); + assertTrue(file.length() > message.getSize()); } @Test - public void testWriteAndReadBundle() throws Exception { + public void testWriteAndReadFile() throws Exception { - testWriteBundle(); + testWriteFile(); - FileInputStream in = new FileInputStream(bundle); - Reader reader = readerFactory.createReader(in); MessageReader messageReader = new MessageReader(keyParser, signature, messageDigest); - HeaderReader headerReader = new HeaderReader(new HeaderFactoryImpl()); + AckReader ackReader = new AckReader(new AckFactoryImpl()); BatchReader batchReader = new BatchReader(batchDigest, messageReader, new BatchFactoryImpl()); - BundleReader r = new BundleReaderImpl(reader, headerReader, - batchReader); - - Header h = r.getHeader(); - assertEquals(acks, h.getAcks()); - assertEquals(subs, h.getSubscriptions()); - assertEquals(transports, h.getTransports()); - Batch b = r.getNextBatch(); + FileInputStream in = new FileInputStream(file); + Reader reader = readerFactory.createReader(in); + reader.addObjectReader(Tags.ACK, ackReader); + reader.addObjectReader(Tags.BATCH, batchReader); + + assertTrue(reader.hasUserDefined(Tags.ACK)); + Ack a = reader.readUserDefined(Tags.ACK, Ack.class); + assertEquals(Collections.singletonList(ack), a.getBatches()); + + assertTrue(reader.hasUserDefined(Tags.BATCH)); + Batch b = reader.readUserDefined(Tags.BATCH, Batch.class); Iterator<Message> i = b.getMessages().iterator(); assertTrue(i.hasNext()); Message m = i.next(); @@ -150,8 +145,8 @@ public class BundleReadWriteTest extends TestCase { assertEquals(message.getTimestamp(), m.getTimestamp()); assertTrue(Arrays.equals(message.getBytes(), m.getBytes())); assertFalse(i.hasNext()); - assertNull(r.getNextBatch()); - r.finish(); + + assertTrue(reader.eof()); } @After diff --git a/test/net/sf/briar/protocol/HeaderReaderTest.java b/test/net/sf/briar/protocol/HeaderReaderTest.java deleted file mode 100644 index e934c59ba918f3c0120abef43b5ae0b86aed4d18..0000000000000000000000000000000000000000 --- a/test/net/sf/briar/protocol/HeaderReaderTest.java +++ /dev/null @@ -1,157 +0,0 @@ -package net.sf.briar.protocol; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.Random; - -import junit.framework.TestCase; -import net.sf.briar.api.protocol.BatchId; -import net.sf.briar.api.protocol.GroupId; -import net.sf.briar.api.protocol.Header; -import net.sf.briar.api.protocol.Tags; -import net.sf.briar.api.protocol.UniqueId; -import net.sf.briar.api.serial.FormatException; -import net.sf.briar.api.serial.Reader; -import net.sf.briar.api.serial.ReaderFactory; -import net.sf.briar.api.serial.Writer; -import net.sf.briar.api.serial.WriterFactory; -import net.sf.briar.serial.SerialModule; - -import org.jmock.Expectations; -import org.jmock.Mockery; -import org.junit.Test; - -import com.google.inject.Guice; -import com.google.inject.Injector; - -public class HeaderReaderTest extends TestCase { - - private final ReaderFactory readerFactory; - private final WriterFactory writerFactory; - private final Mockery context; - - public HeaderReaderTest() throws Exception { - super(); - Injector i = Guice.createInjector(new SerialModule()); - readerFactory = i.getInstance(ReaderFactory.class); - writerFactory = i.getInstance(WriterFactory.class); - context = new Mockery(); - } - - @Test - public void testFormatExceptionIfHeaderIsTooLarge() throws Exception { - HeaderFactory headerFactory = context.mock(HeaderFactory.class); - HeaderReader headerReader = new HeaderReader(headerFactory); - - byte[] b = createHeader(Header.MAX_SIZE + 1); - ByteArrayInputStream in = new ByteArrayInputStream(b); - Reader reader = readerFactory.createReader(in); - reader.addObjectReader(Tags.HEADER, headerReader); - - try { - reader.readUserDefined(Tags.HEADER, Header.class); - assertTrue(false); - } catch(FormatException expected) {} - context.assertIsSatisfied(); - } - - @Test - @SuppressWarnings("unchecked") - public void testNoFormatExceptionIfHeaderIsMaximumSize() throws Exception { - final HeaderFactory headerFactory = context.mock(HeaderFactory.class); - HeaderReader headerReader = new HeaderReader(headerFactory); - final Header header = context.mock(Header.class); - context.checking(new Expectations() {{ - oneOf(headerFactory).createHeader( - with(Collections.<BatchId>emptyList()), - with(any(Collection.class)), with(any(Map.class)), - with(any(long.class))); - will(returnValue(header)); - }}); - - byte[] b = createHeader(Header.MAX_SIZE); - ByteArrayInputStream in = new ByteArrayInputStream(b); - Reader reader = readerFactory.createReader(in); - reader.addObjectReader(Tags.HEADER, headerReader); - - assertEquals(header, reader.readUserDefined(Tags.HEADER, Header.class)); - context.assertIsSatisfied(); - } - - @Test - public void testEmptyHeader() throws Exception { - final HeaderFactory headerFactory = context.mock(HeaderFactory.class); - HeaderReader headerReader = new HeaderReader(headerFactory); - final Header header = context.mock(Header.class); - context.checking(new Expectations() {{ - oneOf(headerFactory).createHeader( - with(Collections.<BatchId>emptyList()), - with(Collections.<GroupId>emptyList()), - with(Collections.<String, String>emptyMap()), - with(any(long.class))); - will(returnValue(header)); - }}); - - byte[] b = createEmptyHeader(); - ByteArrayInputStream in = new ByteArrayInputStream(b); - Reader reader = readerFactory.createReader(in); - reader.addObjectReader(Tags.HEADER, headerReader); - - assertEquals(header, reader.readUserDefined(Tags.HEADER, Header.class)); - context.assertIsSatisfied(); - } - - private byte[] createHeader(int size) throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(size); - Writer w = writerFactory.createWriter(out); - w.writeUserDefinedTag(Tags.HEADER); - // No acks - w.writeListStart(); - w.writeListEnd(); - // Fill most of the header with subs - w.writeListStart(); - byte[] b = new byte[UniqueId.LENGTH]; - Random random = new Random(); - while(out.size() < size - 60) { - w.writeUserDefinedTag(Tags.GROUP_ID); - random.nextBytes(b); - w.writeRaw(b); - } - w.writeListEnd(); - // Transports - w.writeMapStart(); - w.writeString("foo"); - // Build a string that will bring the header up to the expected size - int length = size - out.size() - 12; - assertTrue(length > 0); - StringBuilder s = new StringBuilder(); - for(int i = 0; i < length; i++) s.append((char) ('0' + i % 10)); - w.writeString(s.toString()); - w.writeMapEnd(); - // Timestamp - w.writeInt64(System.currentTimeMillis()); - assertEquals(size, out.size()); - return out.toByteArray(); - } - - private byte[] createEmptyHeader() throws Exception { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Writer w = writerFactory.createWriter(out); - w.writeUserDefinedTag(Tags.HEADER); - // Acks - w.writeListStart(); - w.writeListEnd(); - // Subs - w.writeListStart(); - w.writeListEnd(); - // Transports - w.writeMapStart(); - w.writeMapEnd(); - // Timestamp - w.writeInt64(System.currentTimeMillis()); - return out.toByteArray(); - } -}