diff --git a/briar-core/src/net/sf/briar/protocol/ProtocolReaderFactoryImpl.java b/briar-core/src/net/sf/briar/protocol/ProtocolReaderFactoryImpl.java index 009b26a9fd81f68a4debb9de6e04cdcf3f8757b8..ac1af5728f5aded0919c4c31b97ab68fe388b08d 100644 --- a/briar-core/src/net/sf/briar/protocol/ProtocolReaderFactoryImpl.java +++ b/briar-core/src/net/sf/briar/protocol/ProtocolReaderFactoryImpl.java @@ -10,26 +10,24 @@ import net.sf.briar.api.serial.ReaderFactory; import net.sf.briar.api.serial.StructReader; import com.google.inject.Inject; -import com.google.inject.Provider; -// FIXME: See whether these providers can be got rid of class ProtocolReaderFactoryImpl implements ProtocolReaderFactory { private final ReaderFactory readerFactory; - private final Provider<StructReader<UnverifiedMessage>> messageProvider; - private final Provider<StructReader<SubscriptionUpdate>> subscriptionUpdateProvider; + private final StructReader<UnverifiedMessage> messageReader; + private final StructReader<SubscriptionUpdate> subscriptionUpdateReader; @Inject ProtocolReaderFactoryImpl(ReaderFactory readerFactory, - Provider<StructReader<UnverifiedMessage>> messageProvider, - Provider<StructReader<SubscriptionUpdate>> subscriptionUpdateProvider) { + StructReader<UnverifiedMessage> messageReader, + StructReader<SubscriptionUpdate> subscriptionUpdateReader) { this.readerFactory = readerFactory; - this.messageProvider = messageProvider; - this.subscriptionUpdateProvider = subscriptionUpdateProvider; + this.messageReader = messageReader; + this.subscriptionUpdateReader = subscriptionUpdateReader; } public ProtocolReader createProtocolReader(InputStream in) { - return new ProtocolReaderImpl(readerFactory, messageProvider.get(), - subscriptionUpdateProvider.get(), in); + return new ProtocolReaderImpl(readerFactory, messageReader, + subscriptionUpdateReader, in); } } diff --git a/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java b/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java index ab31510cf7037245763eccc3466947a1be913ff0..0a50becb93fa1aa6f44926cd7a4789f5efe74b38 100644 --- a/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java +++ b/briar-core/src/net/sf/briar/protocol/duplex/DuplexConnection.java @@ -27,10 +27,10 @@ import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.event.ContactRemovedEvent; import net.sf.briar.api.db.event.DatabaseEvent; import net.sf.briar.api.db.event.DatabaseListener; -import net.sf.briar.api.db.event.MessageAddedEvent; -import net.sf.briar.api.db.event.MessageReceivedEvent; import net.sf.briar.api.db.event.LocalSubscriptionsUpdatedEvent; import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent; +import net.sf.briar.api.db.event.MessageAddedEvent; +import net.sf.briar.api.db.event.MessageReceivedEvent; import net.sf.briar.api.plugins.duplex.DuplexTransportConnection; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Message; @@ -42,7 +42,11 @@ import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriter; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.Request; +import net.sf.briar.api.protocol.RetentionAck; +import net.sf.briar.api.protocol.RetentionUpdate; +import net.sf.briar.api.protocol.SubscriptionAck; import net.sf.briar.api.protocol.SubscriptionUpdate; +import net.sf.briar.api.protocol.TransportAck; import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedMessage; @@ -55,7 +59,6 @@ import net.sf.briar.api.transport.ConnectionWriter; import net.sf.briar.api.transport.ConnectionWriterFactory; import net.sf.briar.util.ByteUtils; -// FIXME: Read and write subscription and transport acks abstract class DuplexConnection implements DatabaseListener { private static final Logger LOG = @@ -208,9 +211,13 @@ abstract class DuplexConnection implements DatabaseListener { OutputStream out = createConnectionWriter().getOutputStream(); writer = protoWriterFactory.createProtocolWriter(out, transport.shouldFlush()); - // Send the initial packets: transports, subs, acks, offer + // Send the initial packets: updates, acks, offer + dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportUpdate()); + dbExecutor.execute(new GenerateSubscriptionAck()); dbExecutor.execute(new GenerateSubscriptionUpdate()); + dbExecutor.execute(new GenerateRetentionAck()); + dbExecutor.execute(new GenerateRetentionUpdate()); dbExecutor.execute(new GenerateAcks()); dbExecutor.execute(new GenerateOffer()); // Main loop @@ -519,6 +526,105 @@ abstract class DuplexConnection implements DatabaseListener { } } + // This task runs on a database thread + private class GenerateRetentionAck implements Runnable { + + public void run() { + try { + RetentionAck a = db.generateRetentionAck(contactId); + if(a != null) writerTasks.add(new WriteRetentionAck(a)); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This tasks runs on the writer thread + private class WriteRetentionAck implements Runnable { + + private final RetentionAck ack; + + private WriteRetentionAck(RetentionAck ack) { + this.ack = ack; + } + + public void run() { + assert writer != null; + try { + writer.writeRetentionAck(ack); + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + dispose(true, true); + } + } + } + + // This task runs on a database thread + private class GenerateRetentionUpdate implements Runnable { + + public void run() { + try { + RetentionUpdate u = db.generateRetentionUpdate(contactId); + if(u != null) writerTasks.add(new WriteRetentionUpdate(u)); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This task runs on the writer thread + private class WriteRetentionUpdate implements Runnable { + + private final RetentionUpdate update; + + private WriteRetentionUpdate(RetentionUpdate update) { + this.update = update; + } + + public void run() { + assert writer != null; + try { + writer.writeRetentionUpdate(update); + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + dispose(true, true); + } + } + } + + // This task runs on a database thread + private class GenerateSubscriptionAck implements Runnable { + + public void run() { + try { + SubscriptionAck a = db.generateSubscriptionAck(contactId); + if(a != null) writerTasks.add(new WriteSubscriptionAck(a)); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This tasks runs on the writer thread + private class WriteSubscriptionAck implements Runnable { + + private final SubscriptionAck ack; + + private WriteSubscriptionAck(SubscriptionAck ack) { + this.ack = ack; + } + + public void run() { + assert writer != null; + try { + writer.writeSubscriptionAck(ack); + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + dispose(true, true); + } + } + } + // This task runs on a database thread private class GenerateSubscriptionUpdate implements Runnable { @@ -552,6 +658,40 @@ abstract class DuplexConnection implements DatabaseListener { } } + // This task runs on a database thread + private class GenerateTransportAcks implements Runnable { + + public void run() { + try { + Collection<TransportAck> acks = + db.generateTransportAcks(contactId); + if(acks != null) writerTasks.add(new WriteTransportAcks(acks)); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This tasks runs on the writer thread + private class WriteTransportAcks implements Runnable { + + private final Collection<TransportAck> acks; + + private WriteTransportAcks(Collection<TransportAck> acks) { + this.acks = acks; + } + + public void run() { + assert writer != null; + try { + for(TransportAck a : acks) writer.writeTransportAck(a); + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + dispose(true, true); + } + } + } + // This task runs on a database thread private class GenerateTransportUpdate implements Runnable { diff --git a/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java b/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java index b876c578c4f0c6cc505032e1c8b424c7a6089694..f922a56fbb24c54cddcfa117e269e33307f672b0 100644 --- a/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/protocol/simplex/IncomingSimplexConnection.java @@ -19,7 +19,11 @@ import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageVerifier; import net.sf.briar.api.protocol.ProtocolReader; import net.sf.briar.api.protocol.ProtocolReaderFactory; +import net.sf.briar.api.protocol.RetentionAck; +import net.sf.briar.api.protocol.RetentionUpdate; +import net.sf.briar.api.protocol.SubscriptionAck; import net.sf.briar.api.protocol.SubscriptionUpdate; +import net.sf.briar.api.protocol.TransportAck; import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedMessage; @@ -30,7 +34,6 @@ import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionRegistry; import net.sf.briar.util.ByteUtils; -// FIXME: Read subscription and transport acks class IncomingSimplexConnection { private static final Logger LOG = @@ -82,9 +85,21 @@ class IncomingSimplexConnection { } else if(reader.hasMessage()) { UnverifiedMessage m = reader.readMessage(); verificationExecutor.execute(new VerifyMessage(m)); + } else if(reader.hasRetentionAck()) { + RetentionAck a = reader.readRetentionAck(); + dbExecutor.execute(new ReceiveRetentionAck(a)); + } else if(reader.hasRetentionUpdate()) { + RetentionUpdate u = reader.readRetentionUpdate(); + dbExecutor.execute(new ReceiveRetentionUpdate(u)); + } else if(reader.hasSubscriptionAck()) { + SubscriptionAck a = reader.readSubscriptionAck(); + dbExecutor.execute(new ReceiveSubscriptionAck(a)); } else if(reader.hasSubscriptionUpdate()) { SubscriptionUpdate u = reader.readSubscriptionUpdate(); dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); + } else if(reader.hasTransportAck()) { + TransportAck a = reader.readTransportAck(); + dbExecutor.execute(new ReceiveTransportAck(a)); } else if(reader.hasTransportUpdate()) { TransportUpdate u = reader.readTransportUpdate(); dbExecutor.execute(new ReceiveTransportUpdate(u)); @@ -162,6 +177,57 @@ class IncomingSimplexConnection { } } + private class ReceiveRetentionAck implements Runnable { + + private final RetentionAck ack; + + private ReceiveRetentionAck(RetentionAck ack) { + this.ack = ack; + } + + public void run() { + try { + db.receiveRetentionAck(contactId, ack); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + private class ReceiveRetentionUpdate implements Runnable { + + private final RetentionUpdate update; + + private ReceiveRetentionUpdate(RetentionUpdate update) { + this.update = update; + } + + public void run() { + try { + db.receiveRetentionUpdate(contactId, update); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + private class ReceiveSubscriptionAck implements Runnable { + + private final SubscriptionAck ack; + + private ReceiveSubscriptionAck(SubscriptionAck ack) { + this.ack = ack; + } + + public void run() { + try { + db.receiveSubscriptionAck(contactId, ack); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + private class ReceiveSubscriptionUpdate implements Runnable { private final SubscriptionUpdate update; @@ -179,6 +245,23 @@ class IncomingSimplexConnection { } } + private class ReceiveTransportAck implements Runnable { + + private final TransportAck ack; + + private ReceiveTransportAck(TransportAck ack) { + this.ack = ack; + } + + public void run() { + try { + db.receiveTransportAck(contactId, ack); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + private class ReceiveTransportUpdate implements Runnable { private final TransportUpdate update; diff --git a/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java b/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java index cccfa22dd76a9baba5949901797b4b05113c09b6..e32bf287ff7cc47e3dd365ca17b1ca33edabbc35 100644 --- a/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnection.java @@ -16,7 +16,11 @@ import net.sf.briar.api.plugins.simplex.SimplexTransportWriter; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.ProtocolWriter; import net.sf.briar.api.protocol.ProtocolWriterFactory; +import net.sf.briar.api.protocol.RetentionAck; +import net.sf.briar.api.protocol.RetentionUpdate; +import net.sf.briar.api.protocol.SubscriptionAck; import net.sf.briar.api.protocol.SubscriptionUpdate; +import net.sf.briar.api.protocol.TransportAck; import net.sf.briar.api.protocol.TransportId; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.transport.ConnectionContext; @@ -25,7 +29,6 @@ import net.sf.briar.api.transport.ConnectionWriter; import net.sf.briar.api.transport.ConnectionWriterFactory; import net.sf.briar.util.ByteUtils; -// FIXME: Write subscription and transport acks class OutgoingSimplexConnection { private static final Logger LOG = @@ -55,6 +58,8 @@ class OutgoingSimplexConnection { transportId = ctx.getTransportId(); } + // FIXME: Write each packet to a buffer, check for capacity before writing + // it to the connection (except raw messages, which are already serialised) void write() { connRegistry.registerConnection(contactId, transportId); try { @@ -62,22 +67,33 @@ class OutgoingSimplexConnection { transport.getOutputStream(), transport.getCapacity(), ctx, false, true); OutputStream out = conn.getOutputStream(); + if(conn.getRemainingCapacity() < MAX_PACKET_LENGTH) + throw new EOFException(); ProtocolWriter writer = protoFactory.createProtocolWriter(out, transport.shouldFlush()); - // There should be enough space for a packet - long capacity = conn.getRemainingCapacity(); - if(capacity < MAX_PACKET_LENGTH) throw new EOFException(); - // Write transport updates. FIXME: Check for space - Collection<TransportUpdate> updates = + // Send the initial packets: updates and acks + Collection<TransportAck> transportAcks = + db.generateTransportAcks(contactId); + if(transportAcks != null) { + for(TransportAck ta : transportAcks) + writer.writeTransportAck(ta); + } + Collection<TransportUpdate> transportUpdates = db.generateTransportUpdates(contactId); - if(updates != null) { - for(TransportUpdate u : updates) writer.writeTransportUpdate(u); + if(transportUpdates != null) { + for(TransportUpdate tu : transportUpdates) + writer.writeTransportUpdate(tu); } - // Write a subscription update. FIXME: Check for space - SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId); - if(u != null) writer.writeSubscriptionUpdate(u); + SubscriptionAck sa = db.generateSubscriptionAck(contactId); + if(sa != null) writer.writeSubscriptionAck(sa); + SubscriptionUpdate su = db.generateSubscriptionUpdate(contactId); + if(su != null) writer.writeSubscriptionUpdate(su); + RetentionAck ra = db.generateRetentionAck(contactId); + if(ra != null) writer.writeRetentionAck(ra); + RetentionUpdate ru = db.generateRetentionUpdate(contactId); + if(ru != null) writer.writeRetentionUpdate(ru); // Write acks until you can't write acks no more - capacity = conn.getRemainingCapacity(); + long capacity = conn.getRemainingCapacity(); int maxMessages = writer.getMaxMessagesForAck(capacity); Ack a = db.generateAck(contactId, maxMessages); while(a != null) { diff --git a/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java b/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java index 51d8b76511cca3b22708695f4128973e39d26840..8f74e5cd1f76fc632b7ced1abfd5f0b397b22bec 100644 --- a/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java +++ b/briar-tests/src/net/sf/briar/db/DatabaseComponentTest.java @@ -41,7 +41,6 @@ import org.jmock.Mockery; import org.junit.Test; // FIXME: Replace allowing() with oneOf() to tighten up tests - public abstract class DatabaseComponentTest extends BriarTestCase { protected final Object txn = new Object(); diff --git a/briar-tests/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnectionTest.java b/briar-tests/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnectionTest.java index ff79a44373be2ea1b1cd7a9bd56c3f8c0dff8feb..7098c6d90e08a8d205d9464980497b6d0aa59103 100644 --- a/briar-tests/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnectionTest.java +++ b/briar-tests/src/net/sf/briar/protocol/simplex/OutgoingSimplexConnectionTest.java @@ -108,12 +108,24 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase { OutgoingSimplexConnection connection = new OutgoingSimplexConnection(db, connRegistry, connFactory, protoFactory, ctx, transport); context.checking(new Expectations() {{ - // No transports to send + // No transport acks to send + oneOf(db).generateTransportAcks(contactId); + will(returnValue(null)); + // No transport updates to send oneOf(db).generateTransportUpdates(contactId); will(returnValue(null)); - // No subscriptions to send + // No subscription ack to send + oneOf(db).generateSubscriptionAck(contactId); + will(returnValue(null)); + // No subscription update to send oneOf(db).generateSubscriptionUpdate(contactId); will(returnValue(null)); + // No retention ack to send + oneOf(db).generateRetentionAck(contactId); + will(returnValue(null)); + // No retention update to send + oneOf(db).generateRetentionUpdate(contactId); + will(returnValue(null)); // No acks to send oneOf(db).generateAck(with(contactId), with(any(int.class))); will(returnValue(null)); @@ -141,12 +153,24 @@ public class OutgoingSimplexConnectionTest extends BriarTestCase { connRegistry, connFactory, protoFactory, ctx, transport); final byte[] raw = new byte[1234]; context.checking(new Expectations() {{ - // No transports to send + // No transport acks to send + oneOf(db).generateTransportAcks(contactId); + will(returnValue(null)); + // No transport updates to send oneOf(db).generateTransportUpdates(contactId); will(returnValue(null)); - // No subscriptions to send + // No subscription ack to send + oneOf(db).generateSubscriptionAck(contactId); + will(returnValue(null)); + // No subscription update to send oneOf(db).generateSubscriptionUpdate(contactId); will(returnValue(null)); + // No retention ack to send + oneOf(db).generateRetentionAck(contactId); + will(returnValue(null)); + // No retention update to send + oneOf(db).generateRetentionUpdate(contactId); + will(returnValue(null)); // One ack to send oneOf(db).generateAck(with(contactId), with(any(int.class))); will(returnValue(new Ack(Arrays.asList(messageId))));