diff --git a/briar-api/src/org/briarproject/api/messaging/MessagingSession.java b/briar-api/src/org/briarproject/api/messaging/MessagingSession.java index 786922c8165b25d5196e6e0ae571104bf3437a19..c4d8d398908e608bef5fd0ceb90e5344033d8aec 100644 --- a/briar-api/src/org/briarproject/api/messaging/MessagingSession.java +++ b/briar-api/src/org/briarproject/api/messaging/MessagingSession.java @@ -6,14 +6,13 @@ public interface MessagingSession { /** * Runs the session. This method returns when there are no more packets to - * send or when the {@link #interrupt()} method has been called. + * send or receive, or when the {@link #interrupt()} method has been called. */ void run() throws IOException; /** * Interrupts the session, causing the {@link #run()} method to return at - * the next opportunity or throw an {@link java.io.IOException IOException} - * if it cannot return cleanly. + * the next opportunity. */ void interrupt(); } diff --git a/briar-api/src/org/briarproject/api/messaging/MessagingSessionFactory.java b/briar-api/src/org/briarproject/api/messaging/MessagingSessionFactory.java index 24f74f04406d00ad217ad4eb1d7c45ec6c36c5fa..c5ceca842c7afbc01945b5080afc7f5ed01ab63f 100644 --- a/briar-api/src/org/briarproject/api/messaging/MessagingSessionFactory.java +++ b/briar-api/src/org/briarproject/api/messaging/MessagingSessionFactory.java @@ -8,8 +8,9 @@ import org.briarproject.api.TransportId; public interface MessagingSessionFactory { - MessagingSession createIncomingSession(ContactId c, InputStream in); + MessagingSession createIncomingSession(ContactId c, TransportId t, + InputStream in); MessagingSession createOutgoingSession(ContactId c, TransportId t, - long maxLatency, OutputStream out, boolean duplex); + long maxLatency, boolean duplex, OutputStream out); } diff --git a/briar-core/src/org/briarproject/messaging/ReactiveOutgoingSession.java b/briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java similarity index 95% rename from briar-core/src/org/briarproject/messaging/ReactiveOutgoingSession.java rename to briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java index 9f233275a119b81ecd7fd106bd847edc24249407..ae8265cd6e316f500f025644251e8d2f7ec9ff1c 100644 --- a/briar-core/src/org/briarproject/messaging/ReactiveOutgoingSession.java +++ b/briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java @@ -46,13 +46,15 @@ import org.briarproject.api.messaging.TransportUpdate; /** * An outgoing {@link org.briarproject.api.messaging.MessagingSession - * MessagingSession} that keeps its output stream open and reacts to events - * that make packets available to send. + * MessagingSession} suitable for duplex transports. The session offers + * messages before sending them, keeps its output stream open when there are no + * more packets to send, and reacts to events that make packets available to + * send. */ -class ReactiveOutgoingSession implements MessagingSession, EventListener { +class DuplexOutgoingSession implements MessagingSession, EventListener { private static final Logger LOG = - Logger.getLogger(ReactiveOutgoingSession.class.getName()); + Logger.getLogger(DuplexOutgoingSession.class.getName()); private static final ThrowingRunnable<IOException> CLOSE = new ThrowingRunnable<IOException>() { @@ -62,35 +64,33 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener { private final DatabaseComponent db; private final Executor dbExecutor; private final EventBus eventBus; - private final PacketWriterFactory packetWriterFactory; private final ContactId contactId; private final TransportId transportId; private final long maxLatency; private final OutputStream out; + private final PacketWriter packetWriter; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; - private volatile PacketWriter packetWriter = null; private volatile boolean interrupted = false; - ReactiveOutgoingSession(DatabaseComponent db, Executor dbExecutor, + DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, PacketWriterFactory packetWriterFactory, ContactId contactId, TransportId transportId, long maxLatency, OutputStream out) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; - this.packetWriterFactory = packetWriterFactory; this.contactId = contactId; this.transportId = transportId; this.maxLatency = maxLatency; this.out = out; + packetWriter = packetWriterFactory.createPacketWriter(out); writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); } public void run() throws IOException { eventBus.addListener(this); try { - packetWriter = packetWriterFactory.createPacketWriter(out); // Start a query for each type of packet, in order of urgency dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportUpdates()); @@ -110,6 +110,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener { task.run(); if(writerTasks.isEmpty()) out.flush(); } + out.flush(); out.close(); } catch(InterruptedException e) { LOG.info("Interrupted while waiting for a packet to write"); @@ -128,10 +129,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener { public void eventOccurred(Event e) { if(e instanceof ContactRemovedEvent) { ContactRemovedEvent c = (ContactRemovedEvent) e; - if(contactId.equals(c.getContactId())) { - LOG.info("Contact removed, closing"); - interrupt(); - } + if(c.getContactId().equals(contactId)) interrupt(); } else if(e instanceof MessageAddedEvent) { dbExecutor.execute(new GenerateOffer()); } else if(e instanceof MessageExpiredEvent) { @@ -173,10 +171,7 @@ class ReactiveOutgoingSession implements MessagingSession, EventListener { dbExecutor.execute(new GenerateTransportAcks()); } else if(e instanceof TransportRemovedEvent) { TransportRemovedEvent t = (TransportRemovedEvent) e; - if(t.getTransportId().equals(transportId)) { - LOG.info("Transport removed, closing"); - interrupt(); - } + if(t.getTransportId().equals(transportId)) interrupt(); } } diff --git a/briar-core/src/org/briarproject/messaging/IncomingSession.java b/briar-core/src/org/briarproject/messaging/IncomingSession.java index ee427b9a750bcf27e59cb60c42d088aa2037adc4..ff5c7e9b1589b962ed31146afa2c209d56415ef2 100644 --- a/briar-core/src/org/briarproject/messaging/IncomingSession.java +++ b/briar-core/src/org/briarproject/messaging/IncomingSession.java @@ -10,8 +10,14 @@ import java.util.logging.Logger; import org.briarproject.api.ContactId; import org.briarproject.api.FormatException; +import org.briarproject.api.TransportId; import org.briarproject.api.db.DatabaseComponent; import org.briarproject.api.db.DbException; +import org.briarproject.api.event.ContactRemovedEvent; +import org.briarproject.api.event.Event; +import org.briarproject.api.event.EventBus; +import org.briarproject.api.event.EventListener; +import org.briarproject.api.event.TransportRemovedEvent; import org.briarproject.api.messaging.Ack; import org.briarproject.api.messaging.Message; import org.briarproject.api.messaging.MessageVerifier; @@ -30,66 +36,75 @@ import org.briarproject.api.messaging.UnverifiedMessage; * An incoming {@link org.briarproject.api.messaging.MessagingSession * MessagingSession}. */ -class IncomingSession implements MessagingSession { +class IncomingSession implements MessagingSession, EventListener { private static final Logger LOG = Logger.getLogger(IncomingSession.class.getName()); private final DatabaseComponent db; private final Executor dbExecutor, cryptoExecutor; + private final EventBus eventBus; private final MessageVerifier messageVerifier; - private final PacketReaderFactory packetReaderFactory; private final ContactId contactId; + private final TransportId transportId; private final InputStream in; + private final PacketReader packetReader; private volatile boolean interrupted = false; IncomingSession(DatabaseComponent db, Executor dbExecutor, - Executor cryptoExecutor, MessageVerifier messageVerifier, + Executor cryptoExecutor, EventBus eventBus, + MessageVerifier messageVerifier, PacketReaderFactory packetReaderFactory, ContactId contactId, - InputStream in) { + TransportId transportId, InputStream in) { this.db = db; this.dbExecutor = dbExecutor; this.cryptoExecutor = cryptoExecutor; + this.eventBus = eventBus; this.messageVerifier = messageVerifier; - this.packetReaderFactory = packetReaderFactory; this.contactId = contactId; + this.transportId = transportId; this.in = in; + packetReader = packetReaderFactory.createPacketReader(in); } public void run() throws IOException { - PacketReader packetReader = packetReaderFactory.createPacketReader(in); - // Read packets until interrupted or EOF - while(!interrupted && !packetReader.eof()) { - if(packetReader.hasAck()) { - Ack a = packetReader.readAck(); - dbExecutor.execute(new ReceiveAck(a)); - } else if(packetReader.hasMessage()) { - UnverifiedMessage m = packetReader.readMessage(); - cryptoExecutor.execute(new VerifyMessage(m)); - } else if(packetReader.hasRetentionAck()) { - RetentionAck a = packetReader.readRetentionAck(); - dbExecutor.execute(new ReceiveRetentionAck(a)); - } else if(packetReader.hasRetentionUpdate()) { - RetentionUpdate u = packetReader.readRetentionUpdate(); - dbExecutor.execute(new ReceiveRetentionUpdate(u)); - } else if(packetReader.hasSubscriptionAck()) { - SubscriptionAck a = packetReader.readSubscriptionAck(); - dbExecutor.execute(new ReceiveSubscriptionAck(a)); - } else if(packetReader.hasSubscriptionUpdate()) { - SubscriptionUpdate u = packetReader.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); - } else if(packetReader.hasTransportAck()) { - TransportAck a = packetReader.readTransportAck(); - dbExecutor.execute(new ReceiveTransportAck(a)); - } else if(packetReader.hasTransportUpdate()) { - TransportUpdate u = packetReader.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate(u)); - } else { - throw new FormatException(); + eventBus.addListener(this); + try { + // Read packets until interrupted or EOF + while(!interrupted && !packetReader.eof()) { + if(packetReader.hasAck()) { + Ack a = packetReader.readAck(); + dbExecutor.execute(new ReceiveAck(a)); + } else if(packetReader.hasMessage()) { + UnverifiedMessage m = packetReader.readMessage(); + cryptoExecutor.execute(new VerifyMessage(m)); + } else if(packetReader.hasRetentionAck()) { + RetentionAck a = packetReader.readRetentionAck(); + dbExecutor.execute(new ReceiveRetentionAck(a)); + } else if(packetReader.hasRetentionUpdate()) { + RetentionUpdate u = packetReader.readRetentionUpdate(); + dbExecutor.execute(new ReceiveRetentionUpdate(u)); + } else if(packetReader.hasSubscriptionAck()) { + SubscriptionAck a = packetReader.readSubscriptionAck(); + dbExecutor.execute(new ReceiveSubscriptionAck(a)); + } else if(packetReader.hasSubscriptionUpdate()) { + SubscriptionUpdate u = packetReader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); + } else if(packetReader.hasTransportAck()) { + TransportAck a = packetReader.readTransportAck(); + dbExecutor.execute(new ReceiveTransportAck(a)); + } else if(packetReader.hasTransportUpdate()) { + TransportUpdate u = packetReader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(u)); + } else { + throw new FormatException(); + } } + in.close(); + } finally { + eventBus.removeListener(this); } - in.close(); } public void interrupt() { @@ -98,6 +113,16 @@ class IncomingSession implements MessagingSession { interrupted = true; } + public void eventOccurred(Event e) { + if(e instanceof ContactRemovedEvent) { + ContactRemovedEvent c = (ContactRemovedEvent) e; + if(c.getContactId().equals(contactId)) interrupt(); + } else if(e instanceof TransportRemovedEvent) { + TransportRemovedEvent t = (TransportRemovedEvent) e; + if(t.getTransportId().equals(transportId)) interrupt(); + } + } + private class ReceiveAck implements Runnable { private final Ack ack; diff --git a/briar-core/src/org/briarproject/messaging/MessagingSessionFactoryImpl.java b/briar-core/src/org/briarproject/messaging/MessagingSessionFactoryImpl.java index 20813c27d4aec3ade5150825027b589ab9af235f..04895580ccdd61b606970c60e30c672c5ffbfdb5 100644 --- a/briar-core/src/org/briarproject/messaging/MessagingSessionFactoryImpl.java +++ b/briar-core/src/org/briarproject/messaging/MessagingSessionFactoryImpl.java @@ -43,16 +43,17 @@ class MessagingSessionFactoryImpl implements MessagingSessionFactory { this.packetWriterFactory = packetWriterFactory; } - public MessagingSession createIncomingSession(ContactId c, InputStream in) { - return new IncomingSession(db, dbExecutor, cryptoExecutor, - messageVerifier, packetReaderFactory, c, in); + public MessagingSession createIncomingSession(ContactId c, TransportId t, + InputStream in) { + return new IncomingSession(db, dbExecutor, cryptoExecutor, eventBus, + messageVerifier, packetReaderFactory, c, t, in); } public MessagingSession createOutgoingSession(ContactId c, TransportId t, - long maxLatency, OutputStream out, boolean duplex) { - if(duplex) return new ReactiveOutgoingSession(db, dbExecutor, eventBus, + long maxLatency, boolean duplex, OutputStream out) { + if(duplex) return new DuplexOutgoingSession(db, dbExecutor, eventBus, + packetWriterFactory, c, t, maxLatency, out); + else return new SimplexOutgoingSession(db, dbExecutor, eventBus, packetWriterFactory, c, t, maxLatency, out); - else return new SinglePassOutgoingSession(db, dbExecutor, - packetWriterFactory, c, maxLatency, out); } } diff --git a/briar-core/src/org/briarproject/messaging/SinglePassOutgoingSession.java b/briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java similarity index 81% rename from briar-core/src/org/briarproject/messaging/SinglePassOutgoingSession.java rename to briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java index c0b5bba59a53055597ab6515f7f876adfbf40fb1..4f4d0acd91d91fc504832ce4bc821ee3c44bb797 100644 --- a/briar-core/src/org/briarproject/messaging/SinglePassOutgoingSession.java +++ b/briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java @@ -14,8 +14,14 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; import org.briarproject.api.ContactId; +import org.briarproject.api.TransportId; import org.briarproject.api.db.DatabaseComponent; import org.briarproject.api.db.DbException; +import org.briarproject.api.event.ContactRemovedEvent; +import org.briarproject.api.event.Event; +import org.briarproject.api.event.EventBus; +import org.briarproject.api.event.EventListener; +import org.briarproject.api.event.TransportRemovedEvent; import org.briarproject.api.messaging.Ack; import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.PacketWriter; @@ -29,13 +35,14 @@ import org.briarproject.api.messaging.TransportUpdate; /** * An outgoing {@link org.briarproject.api.messaging.MessagingSession - * MessagingSession} that closes its output stream when no more packets are - * available to send. + * MessagingSession} suitable for simplex transports. The session sends + * messages without offering them, and closes its output stream when there are + * no more packets to send. */ -class SinglePassOutgoingSession implements MessagingSession { +class SimplexOutgoingSession implements MessagingSession, EventListener { private static final Logger LOG = - Logger.getLogger(SinglePassOutgoingSession.class.getName()); + Logger.getLogger(SimplexOutgoingSession.class.getName()); private static final ThrowingRunnable<IOException> CLOSE = new ThrowingRunnable<IOException>() { @@ -44,52 +51,60 @@ class SinglePassOutgoingSession implements MessagingSession { private final DatabaseComponent db; private final Executor dbExecutor; - private final PacketWriterFactory packetWriterFactory; + private final EventBus eventBus; private final ContactId contactId; + private final TransportId transportId; private final long maxLatency; private final OutputStream out; + private final PacketWriter packetWriter; private final AtomicInteger outstandingQueries; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; - private volatile PacketWriter packetWriter = null; private volatile boolean interrupted = false; - SinglePassOutgoingSession(DatabaseComponent db, Executor dbExecutor, - PacketWriterFactory packetWriterFactory, ContactId contactId, - long maxLatency, OutputStream out) { + SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, + EventBus eventBus, PacketWriterFactory packetWriterFactory, + ContactId contactId, TransportId transportId, long maxLatency, + OutputStream out) { this.db = db; this.dbExecutor = dbExecutor; - this.packetWriterFactory = packetWriterFactory; + this.eventBus = eventBus; this.contactId = contactId; + this.transportId = transportId; this.maxLatency = maxLatency; this.out = out; + packetWriter = packetWriterFactory.createPacketWriter(out); outstandingQueries = new AtomicInteger(8); // One per type of packet writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); } public void run() throws IOException { - packetWriter = packetWriterFactory.createPacketWriter(out); - // Start a query for each type of packet, in order of urgency - dbExecutor.execute(new GenerateTransportAcks()); - dbExecutor.execute(new GenerateTransportUpdates()); - dbExecutor.execute(new GenerateSubscriptionAck()); - dbExecutor.execute(new GenerateSubscriptionUpdate()); - dbExecutor.execute(new GenerateRetentionAck()); - dbExecutor.execute(new GenerateRetentionUpdate()); - dbExecutor.execute(new GenerateAck()); - dbExecutor.execute(new GenerateBatch()); - // Write packets until interrupted or there are no more packets to write + eventBus.addListener(this); try { - while(!interrupted) { - ThrowingRunnable<IOException> task = writerTasks.take(); - if(task == CLOSE) break; - task.run(); + // Start a query for each type of packet, in order of urgency + dbExecutor.execute(new GenerateTransportAcks()); + dbExecutor.execute(new GenerateTransportUpdates()); + dbExecutor.execute(new GenerateSubscriptionAck()); + dbExecutor.execute(new GenerateSubscriptionUpdate()); + dbExecutor.execute(new GenerateRetentionAck()); + dbExecutor.execute(new GenerateRetentionUpdate()); + dbExecutor.execute(new GenerateAck()); + dbExecutor.execute(new GenerateBatch()); + // Write packets until interrupted or no more packets to write + try { + while(!interrupted) { + ThrowingRunnable<IOException> task = writerTasks.take(); + if(task == CLOSE) break; + task.run(); + } + out.flush(); + out.close(); + } catch(InterruptedException e) { + LOG.info("Interrupted while waiting for a packet to write"); + Thread.currentThread().interrupt(); } - out.flush(); - out.close(); - } catch(InterruptedException e) { - LOG.info("Interrupted while waiting for a packet to write"); - Thread.currentThread().interrupt(); + } finally { + eventBus.removeListener(this); } } @@ -102,6 +117,16 @@ class SinglePassOutgoingSession implements MessagingSession { if(outstandingQueries.decrementAndGet() == 0) writerTasks.add(CLOSE); } + public void eventOccurred(Event e) { + if(e instanceof ContactRemovedEvent) { + ContactRemovedEvent c = (ContactRemovedEvent) e; + if(c.getContactId().equals(contactId)) interrupt(); + } else if(e instanceof TransportRemovedEvent) { + TransportRemovedEvent t = (TransportRemovedEvent) e; + if(t.getTransportId().equals(transportId)) interrupt(); + } + } + // This task runs on the database thread private class GenerateAck implements Runnable { diff --git a/briar-core/src/org/briarproject/plugins/ConnectionManagerImpl.java b/briar-core/src/org/briarproject/plugins/ConnectionManagerImpl.java index 63544f4fd14e16a95ba9e06cbc55069427c0b878..2c8dba00f8fe5d6ebed00fef2bef84522e4de783 100644 --- a/briar-core/src/org/briarproject/plugins/ConnectionManagerImpl.java +++ b/briar-core/src/org/briarproject/plugins/ConnectionManagerImpl.java @@ -100,7 +100,7 @@ class ConnectionManagerImpl implements ConnectionManager { StreamReader streamReader = streamReaderFactory.createStreamReader(in, r.getMaxFrameLength(), ctx); return messagingSessionFactory.createIncomingSession(ctx.getContactId(), - streamReader.getInputStream()); + ctx.getTransportId(), streamReader.getInputStream()); } private MessagingSession createOutgoingSession(StreamContext ctx, @@ -110,7 +110,7 @@ class ConnectionManagerImpl implements ConnectionManager { w.getMaxFrameLength(), ctx); return messagingSessionFactory.createOutgoingSession(ctx.getContactId(), ctx.getTransportId(), w.getMaxLatency(), - streamWriter.getOutputStream(), duplex); + duplex, streamWriter.getOutputStream()); } private class DispatchIncomingSimplexConnection implements Runnable { diff --git a/briar-tests/build.xml b/briar-tests/build.xml index d5662702c2f3be8c6f39d1ab6ce93282bee4ca3e..2ab788bba3b4ecc03415ed0f4aabe0343e183b1f 100644 --- a/briar-tests/build.xml +++ b/briar-tests/build.xml @@ -112,7 +112,7 @@ <test name='org.briarproject.messaging.ConsumersTest'/> <test name='org.briarproject.messaging.PacketReaderImplTest'/> <test name='org.briarproject.messaging.SimplexMessagingIntegrationTest'/> - <test name='org.briarproject.messaging.SinglePassOutgoingSessionTest'/> + <test name='org.briarproject.messaging.SimplexOutgoingSessionTest'/> <test name='org.briarproject.plugins.ConnectionRegistryImplTest'/> <test name='org.briarproject.plugins.PluginManagerImplTest'/> <test name='org.briarproject.plugins.file.LinuxRemovableDriveFinderTest'/> diff --git a/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java b/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java index e89d86edde01769db08470ae2139e3ef4799a64a..957a46b6ff478044413492c831abfa80c757a76d 100644 --- a/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java +++ b/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java @@ -149,11 +149,13 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out, MAX_FRAME_LENGTH, ctx); // Create an outgoing messaging session + EventBus eventBus = alice.getInstance(EventBus.class); PacketWriterFactory packetWriterFactory = alice.getInstance(PacketWriterFactory.class); - MessagingSession session = new SinglePassOutgoingSession(db, - new ImmediateExecutor(), packetWriterFactory, contactId, - Long.MAX_VALUE, streamWriter.getOutputStream()); + MessagingSession session = new SimplexOutgoingSession(db, + new ImmediateExecutor(), eventBus, packetWriterFactory, + contactId, transportId, Long.MAX_VALUE, + streamWriter.getOutputStream()); // Write whatever needs to be written session.run(); // Clean up @@ -207,13 +209,14 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { StreamReader streamReader = streamReaderFactory.createStreamReader(in, MAX_FRAME_LENGTH, ctx); // Create an incoming messaging session + EventBus eventBus = bob.getInstance(EventBus.class); MessageVerifier messageVerifier = bob.getInstance(MessageVerifier.class); PacketReaderFactory packetReaderFactory = bob.getInstance(PacketReaderFactory.class); MessagingSession session = new IncomingSession(db, - new ImmediateExecutor(), new ImmediateExecutor(), - messageVerifier, packetReaderFactory, contactId, + new ImmediateExecutor(), new ImmediateExecutor(), eventBus, + messageVerifier, packetReaderFactory, contactId, transportId, streamReader.getInputStream()); // No messages should have been added yet assertFalse(listener.messageAdded); diff --git a/briar-tests/src/org/briarproject/messaging/SinglePassOutgoingSessionTest.java b/briar-tests/src/org/briarproject/messaging/SimplexOutgoingSessionTest.java similarity index 79% rename from briar-tests/src/org/briarproject/messaging/SinglePassOutgoingSessionTest.java rename to briar-tests/src/org/briarproject/messaging/SimplexOutgoingSessionTest.java index ca4ea3db20aa34c3a3b2734dff5f195f466d4646..f9fa68c6fdd0efef239201b32a131848093983de 100644 --- a/briar-tests/src/org/briarproject/messaging/SinglePassOutgoingSessionTest.java +++ b/briar-tests/src/org/briarproject/messaging/SimplexOutgoingSessionTest.java @@ -4,22 +4,18 @@ import java.io.ByteArrayOutputStream; import java.util.Arrays; import java.util.Random; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import org.briarproject.BriarTestCase; -import org.briarproject.TestLifecycleModule; -import org.briarproject.TestSystemModule; import org.briarproject.TestUtils; import org.briarproject.api.ContactId; +import org.briarproject.api.TransportId; import org.briarproject.api.UniqueId; import org.briarproject.api.db.DatabaseComponent; -import org.briarproject.api.db.DatabaseExecutor; +import org.briarproject.api.event.EventBus; import org.briarproject.api.messaging.Ack; import org.briarproject.api.messaging.MessageId; -import org.briarproject.api.messaging.MessagingSession; import org.briarproject.api.messaging.PacketWriterFactory; -import org.briarproject.crypto.CryptoModule; -import org.briarproject.event.EventModule; +import org.briarproject.plugins.ImmediateExecutor; import org.briarproject.serial.SerialModule; import org.jmock.Expectations; import org.jmock.Mockery; @@ -30,36 +26,36 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Module; -public class SinglePassOutgoingSessionTest extends BriarTestCase { +public class SimplexOutgoingSessionTest extends BriarTestCase { // FIXME: This is an integration test, not a unit test private final Mockery context; private final DatabaseComponent db; private final Executor dbExecutor; + private final EventBus eventBus; private final PacketWriterFactory packetWriterFactory; private final ContactId contactId; + private final TransportId transportId; private final MessageId messageId; private final byte[] secret; - public SinglePassOutgoingSessionTest() { + public SimplexOutgoingSessionTest() { context = new Mockery(); db = context.mock(DatabaseComponent.class); - dbExecutor = Executors.newSingleThreadExecutor(); + dbExecutor = new ImmediateExecutor(); Module testModule = new AbstractModule() { @Override public void configure() { - bind(DatabaseComponent.class).toInstance(db); - bind(Executor.class).annotatedWith( - DatabaseExecutor.class).toInstance(dbExecutor); + bind(PacketWriterFactory.class).to( + PacketWriterFactoryImpl.class); } }; - Injector i = Guice.createInjector(testModule, - new TestLifecycleModule(), new TestSystemModule(), - new CryptoModule(), new EventModule(), new MessagingModule(), - new SerialModule()); + Injector i = Guice.createInjector(testModule, new SerialModule()); + eventBus = context.mock(EventBus.class); packetWriterFactory = i.getInstance(PacketWriterFactory.class); contactId = new ContactId(234); + transportId = new TransportId("id"); messageId = new MessageId(TestUtils.getRandomId()); secret = new byte[32]; new Random().nextBytes(secret); @@ -68,9 +64,12 @@ public class SinglePassOutgoingSessionTest extends BriarTestCase { @Test public void testNothingToSend() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); - MessagingSession session = new SinglePassOutgoingSession(db, dbExecutor, - packetWriterFactory, contactId, Long.MAX_VALUE, out); + final SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, packetWriterFactory, contactId, + transportId, Long.MAX_VALUE, out); context.checking(new Expectations() {{ + // Add listener + oneOf(eventBus).addListener(session); // No transport acks to send oneOf(db).generateTransportAcks(contactId); will(returnValue(null)); @@ -99,6 +98,8 @@ public class SinglePassOutgoingSessionTest extends BriarTestCase { oneOf(db).generateBatch(with(contactId), with(any(int.class)), with(any(long.class))); will(returnValue(null)); + // Remove listener + oneOf(eventBus).removeListener(session); }}); session.run(); // Nothing should have been written @@ -109,10 +110,13 @@ public class SinglePassOutgoingSessionTest extends BriarTestCase { @Test public void testSomethingToSend() throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); - MessagingSession session = new SinglePassOutgoingSession(db, dbExecutor, - packetWriterFactory, contactId, Long.MAX_VALUE, out); + final SimplexOutgoingSession session = new SimplexOutgoingSession(db, + dbExecutor, eventBus, packetWriterFactory, contactId, + transportId, Long.MAX_VALUE, out); final byte[] raw = new byte[1234]; context.checking(new Expectations() {{ + // Add listener + oneOf(eventBus).addListener(session); // No transport acks to send oneOf(db).generateTransportAcks(contactId); will(returnValue(null)); @@ -148,6 +152,8 @@ public class SinglePassOutgoingSessionTest extends BriarTestCase { oneOf(db).generateBatch(with(contactId), with(any(int.class)), with(any(long.class))); will(returnValue(null)); + // Remove listener + oneOf(eventBus).removeListener(session); }}); session.run(); // Something should have been written