From 4199300e7da3c7443f9e63101b989a3c9468f825 Mon Sep 17 00:00:00 2001
From: bontric <benjohnwie@gmail.com>
Date: Wed, 12 Sep 2018 18:30:19 +0200
Subject: [PATCH] Add "End of stream handling" and keepalive handling to
 MailboxSyncRequestWriter

---
 .../mailbox/MailboxSyncRequestWriter.java     | 29 +++++++++++++++++--
 .../MailboxProtocolIntegrationTest.java       | 17 ++++++-----
 2 files changed, 36 insertions(+), 10 deletions(-)

diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java
index eb59e9350..647b03c68 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java
@@ -7,11 +7,13 @@ import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
 
 	private MailboxProtocol mailboxProtocol;
 	private final ByteArrayOutputStream bufferOS = new ByteArrayOutputStream();
+	private boolean endOfStream = false;
 
 	public MailboxSyncRequestWriter(MailboxProtocol mailboxProtocol) {
 		this.mailboxProtocol = mailboxProtocol;
@@ -23,8 +25,18 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
 	}
 
 	@Override
-	public void sendEndOfStream() throws IOException {
-		flush();
+	public synchronized void sendEndOfStream() throws IOException {
+		if (endOfStream)
+			throw new IOException("End of stream was already written");
+
+		endOfStream = true;
+
+		MailboxRequestSync req = new MailboxRequestSync(new byte[] {}, true);
+		try {
+			mailboxProtocol.writeRequest(req);
+		} catch (InterruptedException e) {
+			throw new IOException(e.toString());
+		}
 	}
 
 	@Override
@@ -34,9 +46,20 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
 
 	@Override
 	public synchronized void flush() throws IOException {
+		if (endOfStream)
+			throw new IOException("End of stream already written");
+
+		// If a sync session flushes without data to write it indicates
+		// a keep alive
+		if (bufferOS.size() == 0){
+			mailboxProtocol.sendKeepAlive();
+			return;
+		}
+
 		byte[] syncStream = bufferOS.toByteArray();
+		bufferOS.reset();
 
-		MailboxRequestSync req = new MailboxRequestSync(syncStream);
+		MailboxRequestSync req = new MailboxRequestSync(syncStream, false);
 		try {
 			mailboxProtocol.writeRequest(req);
 		} catch (InterruptedException e) {
diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java
index 38a061187..7a5420289 100644
--- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java
+++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java
@@ -37,12 +37,12 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase {
 	BdfWriterFactory bdfWriterFactory;
 
 
-	private  BdfWriter bdfWriter;
-	private  BdfReader bdfReader;
-	private  PipedOutputStream pipedOS;
-	private  PipedInputStream pipedIS;
+	private BdfWriter bdfWriter;
+	private BdfReader bdfReader;
+	private PipedOutputStream pipedOS;
+	private PipedInputStream pipedIS;
 
-	private  MailboxProtocol mailboxProtocol;
+	private MailboxProtocol mailboxProtocol;
 
 
 	public MailboxProtocolIntegrationTest() {
@@ -77,7 +77,8 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase {
 	public void testProtocolTerminationOnClosedConnection()
 			throws IOException, InterruptedException {
 		ioExecutor.execute(mailboxProtocol);
-		MailboxRequest req = new MailboxRequestStore(new ContactId(123), "test".getBytes());
+		MailboxRequest req =
+				new MailboxRequestStore(new ContactId(123), "test".getBytes());
 		mailboxProtocol.writeRequest(req);
 
 		// read request to ensure writing thread is running
@@ -174,13 +175,15 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase {
 	}
 
 	private void syncRequest() throws IOException, InterruptedException {
-		MailboxRequestSync req = new MailboxRequestSync("Test".getBytes());
+		MailboxRequestSync req =
+				new MailboxRequestSync("Test".getBytes(), false);
 		mailboxProtocol.writeRequest(req);
 		MailboxRequestSync recvReq =
 				(MailboxRequestSync) mailboxProtocol.getNextRequest();
 		assertEquals(req.getId(), recvReq.getId());
 		assertEquals(req.getType(), recvReq.getType());
 		assertEquals("Test", new String(recvReq.getSyncStream()));
+		assertEquals(false, recvReq.isEndOfStream());
 	}
 
 
-- 
GitLab