diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java index 19b38cb8a348ad5140e18a0261bd665fea1fbb3d..216f293985c7c80375e9fe66e2207c50329c6b59 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java @@ -2,9 +2,9 @@ package org.briarproject.bramble.api.sync; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.transport.StreamWriter; import java.io.InputStream; -import java.io.OutputStream; @NotNullByDefault public interface SyncSessionFactory { @@ -12,8 +12,8 @@ public interface SyncSessionFactory { SyncSession createIncomingSession(ContactId c, InputStream in); SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency, - OutputStream out); + StreamWriter streamWriter); SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, OutputStream out); + int maxIdleTime, StreamWriter streamWriter); } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..923027086008063ad35dc2d2acf0378d6e0afda4 --- /dev/null +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java @@ -0,0 +1,19 @@ +package org.briarproject.bramble.api.transport; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * An interface for writing data to a transport connection. Data will be + * encrypted and authenticated before being written to the connection. + */ +public interface StreamWriter { + + OutputStream getOutputStream(); + + /** + * Sends the end of stream marker, informing the recipient that no more + * data will be sent. The connection is flushed but not closed. + */ + void sendEndOfStream() throws IOException; +} diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java index 3277acee08fcad4b7f34bdd216e3ead3cb6e1f8c..1ddde0c0a2600973c6e18bd27c9598b9a3501321 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java @@ -12,12 +12,12 @@ public interface StreamWriterFactory { * Creates an {@link OutputStream OutputStream} for writing to a * transport stream */ - OutputStream createStreamWriter(OutputStream out, StreamContext ctx); + StreamWriter createStreamWriter(OutputStream out, StreamContext ctx); /** * Creates an {@link OutputStream OutputStream} for writing to a contact * exchange stream. */ - OutputStream createContactExchangeStreamWriter(OutputStream out, + StreamWriter createContactExchangeStreamWriter(OutputStream out, SecretKey headerKey); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java index b2f552d088da6531dd397e407254c3a09bff8628..0a4adb9932ba4e081d5cf790ac292026d60b01bb 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java @@ -30,6 +30,7 @@ import org.briarproject.bramble.api.record.RecordWriter; import org.briarproject.bramble.api.record.RecordWriterFactory; import org.briarproject.bramble.api.system.Clock; import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.EOFException; @@ -152,11 +153,11 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask { recordReaderFactory.createRecordReader(streamReader); // Create the writers - OutputStream streamWriter = + StreamWriter streamWriter = streamWriterFactory.createContactExchangeStreamWriter(out, alice ? aliceHeaderKey : bobHeaderKey); RecordWriter recordWriter = - recordWriterFactory.createRecordWriter(streamWriter); + recordWriterFactory.createRecordWriter(streamWriter.getOutputStream()); // Derive the nonces to be signed byte[] aliceNonce = crypto.mac(ALICE_NONCE_LABEL, masterSecret, @@ -184,8 +185,8 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask { localSignature, localTimestamp); recordWriter.flush(); } - // Close the outgoing stream - recordWriter.close(); + // Send EOF on the outgoing stream + streamWriter.sendEndOfStream(); // Skip any remaining records from the incoming stream try { while (true) recordReader.readRecord(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index 071068ab5cff4e99cb6df455ce202c5c9a3e2fef..99574a872665475932f4c3e6d7bbbacea0568a1b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -14,12 +14,12 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamContext; import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.concurrent.Executor; import java.util.logging.Logger; @@ -101,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager { private SyncSession createSimplexOutgoingSession(StreamContext ctx, TransportConnectionWriter w) throws IOException { - OutputStream streamWriter = streamWriterFactory.createStreamWriter( + StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); return syncSessionFactory.createSimplexOutgoingSession( ctx.getContactId(), w.getMaxLatency(), streamWriter); @@ -109,7 +109,7 @@ class ConnectionManagerImpl implements ConnectionManager { private SyncSession createDuplexOutgoingSession(StreamContext ctx, TransportConnectionWriter w) throws IOException { - OutputStream streamWriter = streamWriterFactory.createStreamWriter( + StreamWriter streamWriter = streamWriterFactory.createStreamWriter( w.getOutputStream(), ctx); return syncSessionFactory.createDuplexOutgoingSession( ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(), @@ -300,8 +300,8 @@ class ConnectionManagerImpl implements ConnectionManager { } private void disposeReader(boolean exception, boolean recognised) { - if (exception && outgoingSession != null) - outgoingSession.interrupt(); + // Interrupt the outgoing session so it finishes cleanly + if (outgoingSession != null) outgoingSession.interrupt(); try { reader.dispose(exception, recognised); } catch (IOException e) { @@ -310,6 +310,8 @@ class ConnectionManagerImpl implements ConnectionManager { } private void disposeWriter(boolean exception) { + // Interrupt the incoming session if an exception occurred, + // otherwise wait for the end of stream marker if (exception && incomingSession != null) incomingSession.interrupt(); try { @@ -407,8 +409,8 @@ class ConnectionManagerImpl implements ConnectionManager { } private void disposeReader(boolean exception, boolean recognised) { - if (exception && outgoingSession != null) - outgoingSession.interrupt(); + // Interrupt the outgoing session so it finishes cleanly + if (outgoingSession != null) outgoingSession.interrupt(); try { reader.dispose(exception, recognised); } catch (IOException e) { @@ -417,6 +419,8 @@ class ConnectionManagerImpl implements ConnectionManager { } private void disposeWriter(boolean exception) { + // Interrupt the incoming session if an exception occurred, + // otherwise wait for the end of stream marker if (exception && incomingSession != null) incomingSession.interrupt(); try { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionRegistryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionRegistryImpl.java index c12abae552fecc97a6e9fa21ad552159a37e39cb..2e4571040107af06e18a965cd5f3108977b73207 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionRegistryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionRegistryImpl.java @@ -106,7 +106,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry { if (m == null) return Collections.emptyList(); List<ContactId> ids = new ArrayList<>(m.keySet()); if (LOG.isLoggable(INFO)) - LOG.info(ids.size() + " contacts connected"); + LOG.info(ids.size() + " contacts connected: " + t); return ids; } finally { lock.unlock(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index c2b54f5a9722fe2e8378f0421df6f45e16a7a805..1461b497404ede447974c1fbec747dc3dc23525e 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.event.MessageSharedEvent; import org.briarproject.bramble.api.sync.event.MessageToAckEvent; import org.briarproject.bramble.api.sync.event.MessageToRequestEvent; import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; import java.util.Collection; @@ -67,6 +68,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { private final Clock clock; private final ContactId contactId; private final int maxLatency, maxIdleTime; + private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; @@ -81,7 +83,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener { DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, EventBus eventBus, Clock clock, ContactId contactId, int maxLatency, - int maxIdleTime, SyncRecordWriter recordWriter) { + int maxIdleTime, StreamWriter streamWriter, + SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; @@ -89,6 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { this.contactId = contactId; this.maxLatency = maxLatency; this.maxIdleTime = maxIdleTime; + this.streamWriter = streamWriter; this.recordWriter = recordWriter; writerTasks = new LinkedBlockingQueue<>(); } @@ -149,7 +153,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { dataToFlush = true; } } - if (dataToFlush) recordWriter.flush(); + streamWriter.sendEndOfStream(); } catch (InterruptedException e) { LOG.info("Interrupted while waiting for a record to write"); Thread.currentThread().interrupt(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java index 2605764b258524f870e09333292538e284bba0c2..9de8d8b8bb44171e8782a5b5c1b2a52dea7a2652 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java @@ -63,7 +63,11 @@ class IncomingSession implements SyncSession, EventListener { eventBus.addListener(this); try { // Read records until interrupted or EOF - while (!interrupted && !recordReader.eof()) { + while (!interrupted) { + if (recordReader.eof()) { + LOG.info("End of stream"); + return; + } if (recordReader.hasAck()) { Ack a = recordReader.readAck(); dbExecutor.execute(new ReceiveAck(a)); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index b0645dc5125880882a7777cbfb188c23deb364a5..ae9bbb2619d32af94c5fe03736d218dee2bf694c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -15,6 +15,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; +import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; import java.util.Collection; @@ -51,6 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final EventBus eventBus; private final ContactId contactId; private final int maxLatency; + private final StreamWriter streamWriter; private final SyncRecordWriter recordWriter; private final AtomicInteger outstandingQueries; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; @@ -58,13 +60,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private volatile boolean interrupted = false; SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, - EventBus eventBus, ContactId contactId, - int maxLatency, SyncRecordWriter recordWriter) { + EventBus eventBus, ContactId contactId, int maxLatency, + StreamWriter streamWriter, SyncRecordWriter recordWriter) { this.db = db; this.dbExecutor = dbExecutor; this.eventBus = eventBus; this.contactId = contactId; this.maxLatency = maxLatency; + this.streamWriter = streamWriter; this.recordWriter = recordWriter; outstandingQueries = new AtomicInteger(2); // One per type of record writerTasks = new LinkedBlockingQueue<>(); @@ -85,7 +88,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { if (task == CLOSE) break; task.run(); } - recordWriter.flush(); + streamWriter.sendEndOfStream(); } catch (InterruptedException e) { LOG.info("Interrupted while waiting for a record to write"); Thread.currentThread().interrupt(); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java index 8459f6eaf975143dde4d7a3e1d34504e75b2a093..d35e1164baedb3a5a27bae31827ddee2f0bcba4f 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java @@ -12,6 +12,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriterFactory; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.transport.StreamWriter; import java.io.InputStream; import java.io.OutputStream; @@ -53,19 +54,21 @@ class SyncSessionFactoryImpl implements SyncSessionFactory { @Override public SyncSession createSimplexOutgoingSession(ContactId c, - int maxLatency, OutputStream out) { + int maxLatency, StreamWriter streamWriter) { + OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new SimplexOutgoingSession(db, dbExecutor, eventBus, c, - maxLatency, recordWriter); + maxLatency, streamWriter, recordWriter); } @Override public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency, - int maxIdleTime, OutputStream out) { + int maxIdleTime, StreamWriter streamWriter) { + OutputStream out = streamWriter.getOutputStream(); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(out); return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c, - maxLatency, maxIdleTime, recordWriter); + maxLatency, maxIdleTime, streamWriter, recordWriter); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java index b443a20e3d0ce31efc93b85465b3e31c339465a7..3410e84465f8fa3d52749c6a675beb3382bb2d55 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java @@ -4,6 +4,7 @@ import org.briarproject.bramble.api.crypto.SecretKey; import org.briarproject.bramble.api.crypto.StreamEncrypterFactory; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.transport.StreamContext; +import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; import java.io.OutputStream; @@ -23,14 +24,14 @@ class StreamWriterFactoryImpl implements StreamWriterFactory { } @Override - public OutputStream createStreamWriter(OutputStream out, + public StreamWriter createStreamWriter(OutputStream out, StreamContext ctx) { return new StreamWriterImpl( streamEncrypterFactory.createStreamEncrypter(out, ctx)); } @Override - public OutputStream createContactExchangeStreamWriter(OutputStream out, + public StreamWriter createContactExchangeStreamWriter(OutputStream out, SecretKey headerKey) { return new StreamWriterImpl( streamEncrypterFactory.createContactExchangeStreamDecrypter(out, diff --git a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java index 142ca19ff798ec7e5c4fe0c46eba5d55e7f152ff..2a2279e35e060bbd4ffc8c51680216569a1a8c35 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java @@ -2,6 +2,7 @@ package org.briarproject.bramble.transport; import org.briarproject.bramble.api.crypto.StreamEncrypter; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; +import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; import java.io.OutputStream; @@ -17,7 +18,7 @@ import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYL */ @NotThreadSafe @NotNullByDefault -class StreamWriterImpl extends OutputStream { +class StreamWriterImpl extends OutputStream implements StreamWriter { private final StreamEncrypter encrypter; private final byte[] payload; @@ -29,6 +30,17 @@ class StreamWriterImpl extends OutputStream { payload = new byte[MAX_PAYLOAD_LENGTH]; } + @Override + public OutputStream getOutputStream() { + return this; + } + + @Override + public void sendEndOfStream() throws IOException { + writeFrame(true); + encrypter.flush(); + } + @Override public void close() throws IOException { writeFrame(true); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java index 3e001dae86a05af999b35eea36325b32b2286148..4aa5cd36b26f319220c3c2b38dfab52deecc4128 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java @@ -7,11 +7,10 @@ import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.sync.Ack; import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.SyncRecordWriter; -import org.briarproject.bramble.test.BrambleTestCase; +import org.briarproject.bramble.api.transport.StreamWriter; +import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.ImmediateExecutor; -import org.briarproject.bramble.test.TestUtils; import org.jmock.Expectations; -import org.jmock.Mockery; import org.junit.Test; import java.util.Arrays; @@ -19,33 +18,27 @@ import java.util.Collections; import java.util.concurrent.Executor; import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS; +import static org.briarproject.bramble.test.TestUtils.getRandomId; -public class SimplexOutgoingSessionTest extends BrambleTestCase { +public class SimplexOutgoingSessionTest extends BrambleMockTestCase { - private final Mockery context; - private final DatabaseComponent db; - private final Executor dbExecutor; - private final EventBus eventBus; - private final ContactId contactId; - private final MessageId messageId; - private final int maxLatency; - private final SyncRecordWriter recordWriter; + private static final int MAX_LATENCY = Integer.MAX_VALUE; - public SimplexOutgoingSessionTest() { - context = new Mockery(); - db = context.mock(DatabaseComponent.class); - dbExecutor = new ImmediateExecutor(); - eventBus = context.mock(EventBus.class); - recordWriter = context.mock(SyncRecordWriter.class); - contactId = new ContactId(234); - messageId = new MessageId(TestUtils.getRandomId()); - maxLatency = Integer.MAX_VALUE; - } + private final DatabaseComponent db = context.mock(DatabaseComponent.class); + private final EventBus eventBus = context.mock(EventBus.class); + private final StreamWriter streamWriter = context.mock(StreamWriter.class); + private final SyncRecordWriter recordWriter = + context.mock(SyncRecordWriter.class); + + private final Executor dbExecutor = new ImmediateExecutor(); + private final ContactId contactId = new ContactId(234); + private final MessageId messageId = new MessageId(getRandomId()); @Test public void testNothingToSend() throws Exception { SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, maxLatency, recordWriter); + dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, + recordWriter); Transaction noAckTxn = new Transaction(null, false); Transaction noMsgTxn = new Transaction(null, false); @@ -63,19 +56,17 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase { oneOf(db).startTransaction(false); will(returnValue(noMsgTxn)); oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(maxLatency)); + with(any(int.class)), with(MAX_LATENCY)); will(returnValue(null)); oneOf(db).commitTransaction(noMsgTxn); oneOf(db).endTransaction(noMsgTxn); - // Flush the output stream - oneOf(recordWriter).flush(); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); // Remove listener oneOf(eventBus).removeListener(session); }}); session.run(); - - context.assertIsSatisfied(); } @Test @@ -83,7 +74,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase { Ack ack = new Ack(Collections.singletonList(messageId)); byte[] raw = new byte[1234]; SimplexOutgoingSession session = new SimplexOutgoingSession(db, - dbExecutor, eventBus, contactId, maxLatency, recordWriter); + dbExecutor, eventBus, contactId, MAX_LATENCY, streamWriter, + recordWriter); Transaction ackTxn = new Transaction(null, false); Transaction noAckTxn = new Transaction(null, false); Transaction msgTxn = new Transaction(null, false); @@ -104,7 +96,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase { oneOf(db).startTransaction(false); will(returnValue(msgTxn)); oneOf(db).generateBatch(with(msgTxn), with(contactId), - with(any(int.class)), with(maxLatency)); + with(any(int.class)), with(MAX_LATENCY)); will(returnValue(Arrays.asList(raw))); oneOf(db).commitTransaction(msgTxn); oneOf(db).endTransaction(msgTxn); @@ -120,18 +112,16 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase { oneOf(db).startTransaction(false); will(returnValue(noMsgTxn)); oneOf(db).generateBatch(with(noMsgTxn), with(contactId), - with(any(int.class)), with(maxLatency)); + with(any(int.class)), with(MAX_LATENCY)); will(returnValue(null)); oneOf(db).commitTransaction(noMsgTxn); oneOf(db).endTransaction(noMsgTxn); - // Flush the output stream - oneOf(recordWriter).flush(); + // Send the end of stream marker + oneOf(streamWriter).sendEndOfStream(); // Remove listener oneOf(eventBus).removeListener(session); }}); session.run(); - - context.assertIsSatisfied(); } } diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java index 1504da9eb48cfcd038d724c04c4fa5398e8d954e..d3f1eb905efb06781a2ce3dbd81a86f83ccb1329 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java @@ -19,6 +19,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncRecordWriterFactory; import org.briarproject.bramble.api.transport.StreamContext; import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.TestUtils; @@ -27,7 +28,6 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.InputStream; -import java.io.OutputStream; import java.util.Arrays; import java.util.Collection; @@ -102,18 +102,18 @@ public class SyncIntegrationTest extends BrambleTestCase { ByteArrayOutputStream out = new ByteArrayOutputStream(); StreamContext ctx = new StreamContext(contactId, transportId, tagKey, headerKey, streamNumber); - OutputStream streamWriter = streamWriterFactory.createStreamWriter(out, + StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out, ctx); SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter( - streamWriter); + streamWriter.getOutputStream()); recordWriter.writeAck(new Ack(messageIds)); recordWriter.writeMessage(message.getRaw()); recordWriter.writeMessage(message1.getRaw()); recordWriter.writeOffer(new Offer(messageIds)); recordWriter.writeRequest(new Request(messageIds)); - recordWriter.flush(); + streamWriter.sendEndOfStream(); return out.toByteArray(); } diff --git a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java index 879b763daec59a93251164b0cfbdaa151b3cde66..1007afe5e08866139119b5d3b3d129d5c1d8d434 100644 --- a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java @@ -16,6 +16,7 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.transport.KeyManager; import org.briarproject.bramble.api.transport.StreamContext; import org.briarproject.bramble.api.transport.StreamReaderFactory; +import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.api.transport.StreamWriterFactory; import org.briarproject.bramble.contact.ContactModule; import org.briarproject.bramble.identity.IdentityModule; @@ -39,7 +40,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.InputStream; -import java.io.OutputStream; import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH; import static org.briarproject.bramble.test.TestPluginConfigModule.MAX_LATENCY; @@ -69,7 +69,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { alice = DaggerSimplexMessagingIntegrationTestComponent.builder() .testDatabaseModule(new TestDatabaseModule(aliceDir)).build(); injectEagerSingletons(alice); - alice.inject(new SystemModule.EagerSingletons()); bob = DaggerSimplexMessagingIntegrationTestComponent.builder() .testDatabaseModule(new TestDatabaseModule(bobDir)).build(); injectEagerSingletons(bob); @@ -159,7 +158,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { // Create a stream writer StreamWriterFactory streamWriterFactory = device.getStreamWriterFactory(); - OutputStream streamWriter = + StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out, ctx); // Create an outgoing sync session SyncSessionFactory syncSessionFactory = device.getSyncSessionFactory(); @@ -167,7 +166,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { contactId, MAX_LATENCY, streamWriter); // Write whatever needs to be written session.run(); - streamWriter.close(); + streamWriter.sendEndOfStream(); // Return the contents of the stream return out.toByteArray(); } diff --git a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java index 7175cdb776d4cde3fe896e014e8110afd827d654..481a574869a56d0f455a7f6673b7bf1f56fb5f32 100644 --- a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java +++ b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java @@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.sync.SyncSessionFactory; import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent; import org.briarproject.bramble.api.system.Clock; +import org.briarproject.bramble.api.transport.StreamWriter; import org.briarproject.bramble.contact.ContactModule; import org.briarproject.bramble.crypto.CryptoExecutorModule; import org.briarproject.bramble.identity.IdentityModule; @@ -340,9 +341,10 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone LOG.info("TEST: Sending message from " + from + " to " + to); ByteArrayOutputStream out = new ByteArrayOutputStream(); + StreamWriter streamWriter = new TestStreamWriter(out); // Create an outgoing sync session - SyncSession sessionFrom = - fromSync.createSimplexOutgoingSession(toId, MAX_LATENCY, out); + SyncSession sessionFrom = fromSync.createSimplexOutgoingSession(toId, + MAX_LATENCY, streamWriter); // Write whatever needs to be written sessionFrom.run(); out.close(); diff --git a/briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java b/briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..ba95dbb9c4bd0ab80ed10041f128724681a28066 --- /dev/null +++ b/briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java @@ -0,0 +1,25 @@ +package org.briarproject.briar.test; + +import org.briarproject.bramble.api.transport.StreamWriter; + +import java.io.IOException; +import java.io.OutputStream; + +class TestStreamWriter implements StreamWriter { + + private final OutputStream out; + + TestStreamWriter(OutputStream out) { + this.out = out; + } + + @Override + public OutputStream getOutputStream() { + return out; + } + + @Override + public void sendEndOfStream() throws IOException { + out.flush(); + } +}