From 5320737d4956f675d148b4dbbd9d01775c990406 Mon Sep 17 00:00:00 2001
From: akwizgran <michael@briarproject.org>
Date: Sat, 28 Apr 2018 23:38:41 +0100
Subject: [PATCH] Send end of stream marker when sync session finishes.

---
 .../bramble/api/sync/SyncSessionFactory.java  |  6 ++---
 .../bramble/api/transport/StreamWriter.java   | 19 ++++++++++++++
 .../api/transport/StreamWriterFactory.java    |  4 +--
 .../contact/ContactExchangeTaskImpl.java      |  9 ++++---
 .../bramble/plugin/ConnectionManagerImpl.java |  6 ++---
 .../bramble/sync/DuplexOutgoingSession.java   |  8 ++++--
 .../bramble/sync/IncomingSession.java         |  6 ++++-
 .../bramble/sync/SimplexOutgoingSession.java  |  9 ++++---
 .../bramble/sync/SyncSessionFactoryImpl.java  | 11 +++++---
 .../transport/StreamWriterFactoryImpl.java    |  5 ++--
 .../bramble/transport/StreamWriterImpl.java   | 14 ++++++++++-
 .../sync/SimplexOutgoingSessionTest.java      | 18 ++++++++-----
 .../bramble/sync/SyncIntegrationTest.java     |  8 +++---
 .../SimplexMessagingIntegrationTest.java      |  7 +++---
 .../briar/test/BriarIntegrationTest.java      |  6 +++--
 .../briar/test/TestStreamWriter.java          | 25 +++++++++++++++++++
 16 files changed, 120 insertions(+), 41 deletions(-)
 create mode 100644 bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java
 create mode 100644 briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java

diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java
index 19b38cb8a3..216f293985 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSessionFactory.java
@@ -2,9 +2,9 @@ package org.briarproject.bramble.api.sync;
 
 import org.briarproject.bramble.api.contact.ContactId;
 import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
+import org.briarproject.bramble.api.transport.StreamWriter;
 
 import java.io.InputStream;
-import java.io.OutputStream;
 
 @NotNullByDefault
 public interface SyncSessionFactory {
@@ -12,8 +12,8 @@ public interface SyncSessionFactory {
 	SyncSession createIncomingSession(ContactId c, InputStream in);
 
 	SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency,
-			OutputStream out);
+			StreamWriter streamWriter);
 
 	SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
-			int maxIdleTime, OutputStream out);
+			int maxIdleTime, StreamWriter streamWriter);
 }
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java
new file mode 100644
index 0000000000..9230270860
--- /dev/null
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriter.java
@@ -0,0 +1,19 @@
+package org.briarproject.bramble.api.transport;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * An interface for writing data to a transport connection. Data will be
+ * encrypted and authenticated before being written to the connection.
+ */
+public interface StreamWriter {
+
+	OutputStream getOutputStream();
+
+	/**
+	 * Sends the end of stream marker, informing the recipient that no more
+	 * data will be sent. The connection is flushed but not closed.
+	 */
+	void sendEndOfStream() throws IOException;
+}
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java
index 3277acee08..1ddde0c0a2 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/transport/StreamWriterFactory.java
@@ -12,12 +12,12 @@ public interface StreamWriterFactory {
 	 * Creates an {@link OutputStream OutputStream} for writing to a
 	 * transport stream
 	 */
-	OutputStream createStreamWriter(OutputStream out, StreamContext ctx);
+	StreamWriter createStreamWriter(OutputStream out, StreamContext ctx);
 
 	/**
 	 * Creates an {@link OutputStream OutputStream} for writing to a contact
 	 * exchange stream.
 	 */
-	OutputStream createContactExchangeStreamWriter(OutputStream out,
+	StreamWriter createContactExchangeStreamWriter(OutputStream out,
 			SecretKey headerKey);
 }
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java
index b2f552d088..0a4adb9932 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/contact/ContactExchangeTaskImpl.java
@@ -30,6 +30,7 @@ import org.briarproject.bramble.api.record.RecordWriter;
 import org.briarproject.bramble.api.record.RecordWriterFactory;
 import org.briarproject.bramble.api.system.Clock;
 import org.briarproject.bramble.api.transport.StreamReaderFactory;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.api.transport.StreamWriterFactory;
 
 import java.io.EOFException;
@@ -152,11 +153,11 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
 				recordReaderFactory.createRecordReader(streamReader);
 
 		// Create the writers
-		OutputStream streamWriter =
+		StreamWriter streamWriter =
 				streamWriterFactory.createContactExchangeStreamWriter(out,
 						alice ? aliceHeaderKey : bobHeaderKey);
 		RecordWriter recordWriter =
-				recordWriterFactory.createRecordWriter(streamWriter);
+				recordWriterFactory.createRecordWriter(streamWriter.getOutputStream());
 
 		// Derive the nonces to be signed
 		byte[] aliceNonce = crypto.mac(ALICE_NONCE_LABEL, masterSecret,
@@ -184,8 +185,8 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
 						localSignature, localTimestamp);
 				recordWriter.flush();
 			}
-			// Close the outgoing stream
-			recordWriter.close();
+			// Send EOF on the outgoing stream
+			streamWriter.sendEndOfStream();
 			// Skip any remaining records from the incoming stream
 			try {
 				while (true) recordReader.readRecord();
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java
index 071068ab5c..cc188a3eb9 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java
@@ -14,12 +14,12 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory;
 import org.briarproject.bramble.api.transport.KeyManager;
 import org.briarproject.bramble.api.transport.StreamContext;
 import org.briarproject.bramble.api.transport.StreamReaderFactory;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.api.transport.StreamWriterFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.concurrent.Executor;
 import java.util.logging.Logger;
 
@@ -101,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager {
 
 	private SyncSession createSimplexOutgoingSession(StreamContext ctx,
 			TransportConnectionWriter w) throws IOException {
-		OutputStream streamWriter = streamWriterFactory.createStreamWriter(
+		StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
 				w.getOutputStream(), ctx);
 		return syncSessionFactory.createSimplexOutgoingSession(
 				ctx.getContactId(), w.getMaxLatency(), streamWriter);
@@ -109,7 +109,7 @@ class ConnectionManagerImpl implements ConnectionManager {
 
 	private SyncSession createDuplexOutgoingSession(StreamContext ctx,
 			TransportConnectionWriter w) throws IOException {
-		OutputStream streamWriter = streamWriterFactory.createStreamWriter(
+		StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
 				w.getOutputStream(), ctx);
 		return syncSessionFactory.createDuplexOutgoingSession(
 				ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(),
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
index c2b54f5a97..1461b49740 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
@@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
 import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
 import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
 import org.briarproject.bramble.api.system.Clock;
+import org.briarproject.bramble.api.transport.StreamWriter;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -67,6 +68,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
 	private final Clock clock;
 	private final ContactId contactId;
 	private final int maxLatency, maxIdleTime;
+	private final StreamWriter streamWriter;
 	private final SyncRecordWriter recordWriter;
 	private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
 
@@ -81,7 +83,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
 
 	DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
 			EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
-			int maxIdleTime, SyncRecordWriter recordWriter) {
+			int maxIdleTime, StreamWriter streamWriter,
+			SyncRecordWriter recordWriter) {
 		this.db = db;
 		this.dbExecutor = dbExecutor;
 		this.eventBus = eventBus;
@@ -89,6 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
 		this.contactId = contactId;
 		this.maxLatency = maxLatency;
 		this.maxIdleTime = maxIdleTime;
+		this.streamWriter = streamWriter;
 		this.recordWriter = recordWriter;
 		writerTasks = new LinkedBlockingQueue<>();
 	}
@@ -149,7 +153,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
 						dataToFlush = true;
 					}
 				}
-				if (dataToFlush) recordWriter.flush();
+				streamWriter.sendEndOfStream();
 			} catch (InterruptedException e) {
 				LOG.info("Interrupted while waiting for a record to write");
 				Thread.currentThread().interrupt();
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java
index 2605764b25..9de8d8b8bb 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java
@@ -63,7 +63,11 @@ class IncomingSession implements SyncSession, EventListener {
 		eventBus.addListener(this);
 		try {
 			// Read records until interrupted or EOF
-			while (!interrupted && !recordReader.eof()) {
+			while (!interrupted) {
+				if (recordReader.eof()) {
+					LOG.info("End of stream");
+					return;
+				}
 				if (recordReader.hasAck()) {
 					Ack a = recordReader.readAck();
 					dbExecutor.execute(new ReceiveAck(a));
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
index b0645dc512..ae9bbb2619 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
 import org.briarproject.bramble.api.sync.Ack;
 import org.briarproject.bramble.api.sync.SyncRecordWriter;
 import org.briarproject.bramble.api.sync.SyncSession;
+import org.briarproject.bramble.api.transport.StreamWriter;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -51,6 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 	private final EventBus eventBus;
 	private final ContactId contactId;
 	private final int maxLatency;
+	private final StreamWriter streamWriter;
 	private final SyncRecordWriter recordWriter;
 	private final AtomicInteger outstandingQueries;
 	private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
@@ -58,13 +60,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 	private volatile boolean interrupted = false;
 
 	SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
-			EventBus eventBus, ContactId contactId,
-			int maxLatency, SyncRecordWriter recordWriter) {
+			EventBus eventBus, ContactId contactId, int maxLatency,
+			StreamWriter streamWriter, SyncRecordWriter recordWriter) {
 		this.db = db;
 		this.dbExecutor = dbExecutor;
 		this.eventBus = eventBus;
 		this.contactId = contactId;
 		this.maxLatency = maxLatency;
+		this.streamWriter = streamWriter;
 		this.recordWriter = recordWriter;
 		outstandingQueries = new AtomicInteger(2); // One per type of record
 		writerTasks = new LinkedBlockingQueue<>();
@@ -85,7 +88,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 					if (task == CLOSE) break;
 					task.run();
 				}
-				recordWriter.flush();
+				streamWriter.sendEndOfStream();
 			} catch (InterruptedException e) {
 				LOG.info("Interrupted while waiting for a record to write");
 				Thread.currentThread().interrupt();
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java
index 8459f6eaf9..d35e1164ba 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncSessionFactoryImpl.java
@@ -12,6 +12,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriterFactory;
 import org.briarproject.bramble.api.sync.SyncSession;
 import org.briarproject.bramble.api.sync.SyncSessionFactory;
 import org.briarproject.bramble.api.system.Clock;
+import org.briarproject.bramble.api.transport.StreamWriter;
 
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -53,19 +54,21 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
 
 	@Override
 	public SyncSession createSimplexOutgoingSession(ContactId c,
-			int maxLatency, OutputStream out) {
+			int maxLatency, StreamWriter streamWriter) {
+		OutputStream out = streamWriter.getOutputStream();
 		SyncRecordWriter recordWriter =
 				recordWriterFactory.createRecordWriter(out);
 		return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
-				maxLatency, recordWriter);
+				maxLatency, streamWriter, recordWriter);
 	}
 
 	@Override
 	public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
-			int maxIdleTime, OutputStream out) {
+			int maxIdleTime, StreamWriter streamWriter) {
+		OutputStream out = streamWriter.getOutputStream();
 		SyncRecordWriter recordWriter =
 				recordWriterFactory.createRecordWriter(out);
 		return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
-				maxLatency, maxIdleTime, recordWriter);
+				maxLatency, maxIdleTime, streamWriter, recordWriter);
 	}
 }
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java
index b443a20e3d..3410e84465 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterFactoryImpl.java
@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.crypto.SecretKey;
 import org.briarproject.bramble.api.crypto.StreamEncrypterFactory;
 import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
 import org.briarproject.bramble.api.transport.StreamContext;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.api.transport.StreamWriterFactory;
 
 import java.io.OutputStream;
@@ -23,14 +24,14 @@ class StreamWriterFactoryImpl implements StreamWriterFactory {
 	}
 
 	@Override
-	public OutputStream createStreamWriter(OutputStream out,
+	public StreamWriter createStreamWriter(OutputStream out,
 			StreamContext ctx) {
 		return new StreamWriterImpl(
 				streamEncrypterFactory.createStreamEncrypter(out, ctx));
 	}
 
 	@Override
-	public OutputStream createContactExchangeStreamWriter(OutputStream out,
+	public StreamWriter createContactExchangeStreamWriter(OutputStream out,
 			SecretKey headerKey) {
 		return new StreamWriterImpl(
 				streamEncrypterFactory.createContactExchangeStreamDecrypter(out,
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java
index 142ca19ff7..2a2279e35e 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/transport/StreamWriterImpl.java
@@ -2,6 +2,7 @@ package org.briarproject.bramble.transport;
 
 import org.briarproject.bramble.api.crypto.StreamEncrypter;
 import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
+import org.briarproject.bramble.api.transport.StreamWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -17,7 +18,7 @@ import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYL
  */
 @NotThreadSafe
 @NotNullByDefault
-class StreamWriterImpl extends OutputStream {
+class StreamWriterImpl extends OutputStream implements StreamWriter {
 
 	private final StreamEncrypter encrypter;
 	private final byte[] payload;
@@ -29,6 +30,17 @@ class StreamWriterImpl extends OutputStream {
 		payload = new byte[MAX_PAYLOAD_LENGTH];
 	}
 
+	@Override
+	public OutputStream getOutputStream() {
+		return this;
+	}
+
+	@Override
+	public void sendEndOfStream() throws IOException {
+		writeFrame(true);
+		encrypter.flush();
+	}
+
 	@Override
 	public void close() throws IOException {
 		writeFrame(true);
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java
index 3e001dae86..580b970064 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SimplexOutgoingSessionTest.java
@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.event.EventBus;
 import org.briarproject.bramble.api.sync.Ack;
 import org.briarproject.bramble.api.sync.MessageId;
 import org.briarproject.bramble.api.sync.SyncRecordWriter;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.test.BrambleTestCase;
 import org.briarproject.bramble.test.ImmediateExecutor;
 import org.briarproject.bramble.test.TestUtils;
@@ -20,6 +21,7 @@ import java.util.concurrent.Executor;
 
 import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
 
+// TODO: Convert to BrambleMockTestCase
 public class SimplexOutgoingSessionTest extends BrambleTestCase {
 
 	private final Mockery context;
@@ -29,6 +31,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
 	private final ContactId contactId;
 	private final MessageId messageId;
 	private final int maxLatency;
+	private final StreamWriter streamWriter;
 	private final SyncRecordWriter recordWriter;
 
 	public SimplexOutgoingSessionTest() {
@@ -36,6 +39,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
 		db = context.mock(DatabaseComponent.class);
 		dbExecutor = new ImmediateExecutor();
 		eventBus = context.mock(EventBus.class);
+		streamWriter = context.mock(StreamWriter.class);
 		recordWriter = context.mock(SyncRecordWriter.class);
 		contactId = new ContactId(234);
 		messageId = new MessageId(TestUtils.getRandomId());
@@ -45,7 +49,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
 	@Test
 	public void testNothingToSend() throws Exception {
 		SimplexOutgoingSession session = new SimplexOutgoingSession(db,
-				dbExecutor, eventBus, contactId, maxLatency, recordWriter);
+				dbExecutor, eventBus, contactId, maxLatency, streamWriter,
+				recordWriter);
 		Transaction noAckTxn = new Transaction(null, false);
 		Transaction noMsgTxn = new Transaction(null, false);
 
@@ -67,8 +72,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
 			will(returnValue(null));
 			oneOf(db).commitTransaction(noMsgTxn);
 			oneOf(db).endTransaction(noMsgTxn);
-			// Flush the output stream
-			oneOf(recordWriter).flush();
+			// Send the end of stream marker
+			oneOf(streamWriter).sendEndOfStream();
 			// Remove listener
 			oneOf(eventBus).removeListener(session);
 		}});
@@ -83,7 +88,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
 		Ack ack = new Ack(Collections.singletonList(messageId));
 		byte[] raw = new byte[1234];
 		SimplexOutgoingSession session = new SimplexOutgoingSession(db,
-				dbExecutor, eventBus, contactId, maxLatency, recordWriter);
+				dbExecutor, eventBus, contactId, maxLatency, streamWriter,
+				recordWriter);
 		Transaction ackTxn = new Transaction(null, false);
 		Transaction noAckTxn = new Transaction(null, false);
 		Transaction msgTxn = new Transaction(null, false);
@@ -124,8 +130,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
 			will(returnValue(null));
 			oneOf(db).commitTransaction(noMsgTxn);
 			oneOf(db).endTransaction(noMsgTxn);
-			// Flush the output stream
-			oneOf(recordWriter).flush();
+			// Send the end of stream marker
+			oneOf(streamWriter).sendEndOfStream();
 			// Remove listener
 			oneOf(eventBus).removeListener(session);
 		}});
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java
index 1504da9eb4..d3f1eb905e 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/sync/SyncIntegrationTest.java
@@ -19,6 +19,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter;
 import org.briarproject.bramble.api.sync.SyncRecordWriterFactory;
 import org.briarproject.bramble.api.transport.StreamContext;
 import org.briarproject.bramble.api.transport.StreamReaderFactory;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.api.transport.StreamWriterFactory;
 import org.briarproject.bramble.test.BrambleTestCase;
 import org.briarproject.bramble.test.TestUtils;
@@ -27,7 +28,6 @@ import org.junit.Test;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collection;
 
@@ -102,18 +102,18 @@ public class SyncIntegrationTest extends BrambleTestCase {
 		ByteArrayOutputStream out = new ByteArrayOutputStream();
 		StreamContext ctx = new StreamContext(contactId, transportId, tagKey,
 				headerKey, streamNumber);
-		OutputStream streamWriter = streamWriterFactory.createStreamWriter(out,
+		StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out,
 				ctx);
 		SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(
-				streamWriter);
+				streamWriter.getOutputStream());
 
 		recordWriter.writeAck(new Ack(messageIds));
 		recordWriter.writeMessage(message.getRaw());
 		recordWriter.writeMessage(message1.getRaw());
 		recordWriter.writeOffer(new Offer(messageIds));
 		recordWriter.writeRequest(new Request(messageIds));
-		recordWriter.flush();
 
+		streamWriter.sendEndOfStream();
 		return out.toByteArray();
 	}
 
diff --git a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java
index 879b763dae..1007afe5e0 100644
--- a/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java
+++ b/briar-core/src/test/java/org/briarproject/briar/messaging/SimplexMessagingIntegrationTest.java
@@ -16,6 +16,7 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory;
 import org.briarproject.bramble.api.transport.KeyManager;
 import org.briarproject.bramble.api.transport.StreamContext;
 import org.briarproject.bramble.api.transport.StreamReaderFactory;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.api.transport.StreamWriterFactory;
 import org.briarproject.bramble.contact.ContactModule;
 import org.briarproject.bramble.identity.IdentityModule;
@@ -39,7 +40,6 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.InputStream;
-import java.io.OutputStream;
 
 import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
 import static org.briarproject.bramble.test.TestPluginConfigModule.MAX_LATENCY;
@@ -69,7 +69,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
 		alice = DaggerSimplexMessagingIntegrationTestComponent.builder()
 				.testDatabaseModule(new TestDatabaseModule(aliceDir)).build();
 		injectEagerSingletons(alice);
-		alice.inject(new SystemModule.EagerSingletons());
 		bob = DaggerSimplexMessagingIntegrationTestComponent.builder()
 				.testDatabaseModule(new TestDatabaseModule(bobDir)).build();
 		injectEagerSingletons(bob);
@@ -159,7 +158,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
 		// Create a stream writer
 		StreamWriterFactory streamWriterFactory =
 				device.getStreamWriterFactory();
-		OutputStream streamWriter =
+		StreamWriter streamWriter =
 				streamWriterFactory.createStreamWriter(out, ctx);
 		// Create an outgoing sync session
 		SyncSessionFactory syncSessionFactory = device.getSyncSessionFactory();
@@ -167,7 +166,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
 				contactId, MAX_LATENCY, streamWriter);
 		// Write whatever needs to be written
 		session.run();
-		streamWriter.close();
+		streamWriter.sendEndOfStream();
 		// Return the contents of the stream
 		return out.toByteArray();
 	}
diff --git a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java
index 7175cdb776..481a574869 100644
--- a/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java
+++ b/briar-core/src/test/java/org/briarproject/briar/test/BriarIntegrationTest.java
@@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.SyncSession;
 import org.briarproject.bramble.api.sync.SyncSessionFactory;
 import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
 import org.briarproject.bramble.api.system.Clock;
+import org.briarproject.bramble.api.transport.StreamWriter;
 import org.briarproject.bramble.contact.ContactModule;
 import org.briarproject.bramble.crypto.CryptoExecutorModule;
 import org.briarproject.bramble.identity.IdentityModule;
@@ -340,9 +341,10 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
 		LOG.info("TEST: Sending message from " + from + " to " + to);
 
 		ByteArrayOutputStream out = new ByteArrayOutputStream();
+		StreamWriter streamWriter = new TestStreamWriter(out);
 		// Create an outgoing sync session
-		SyncSession sessionFrom =
-				fromSync.createSimplexOutgoingSession(toId, MAX_LATENCY, out);
+		SyncSession sessionFrom = fromSync.createSimplexOutgoingSession(toId,
+				MAX_LATENCY, streamWriter);
 		// Write whatever needs to be written
 		sessionFrom.run();
 		out.close();
diff --git a/briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java b/briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java
new file mode 100644
index 0000000000..ba95dbb9c4
--- /dev/null
+++ b/briar-core/src/test/java/org/briarproject/briar/test/TestStreamWriter.java
@@ -0,0 +1,25 @@
+package org.briarproject.briar.test;
+
+import org.briarproject.bramble.api.transport.StreamWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+class TestStreamWriter implements StreamWriter {
+
+	private final OutputStream out;
+
+	TestStreamWriter(OutputStream out) {
+		this.out = out;
+	}
+
+	@Override
+	public OutputStream getOutputStream() {
+		return out;
+	}
+
+	@Override
+	public void sendEndOfStream() throws IOException {
+		out.flush();
+	}
+}
-- 
GitLab