diff --git a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java index eb59e93508cbac3aaf6d78bb2542aae0e151d498..647b03c683af14748c4bc86d9f535586f3e36600 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/mailbox/MailboxSyncRequestWriter.java @@ -7,11 +7,13 @@ import org.briarproject.bramble.mailbox.protocol.MailboxRequestSync; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { private MailboxProtocol mailboxProtocol; private final ByteArrayOutputStream bufferOS = new ByteArrayOutputStream(); + private boolean endOfStream = false; public MailboxSyncRequestWriter(MailboxProtocol mailboxProtocol) { this.mailboxProtocol = mailboxProtocol; @@ -23,8 +25,18 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { } @Override - public void sendEndOfStream() throws IOException { - flush(); + public synchronized void sendEndOfStream() throws IOException { + if (endOfStream) + throw new IOException("End of stream was already written"); + + endOfStream = true; + + MailboxRequestSync req = new MailboxRequestSync(new byte[] {}, true); + try { + mailboxProtocol.writeRequest(req); + } catch (InterruptedException e) { + throw new IOException(e.toString()); + } } @Override @@ -34,9 +46,20 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { @Override public synchronized void flush() throws IOException { + if (endOfStream) + throw new IOException("End of stream already written"); + + // If a sync session flushes without data to write it indicates + // a keep alive + if (bufferOS.size() == 0){ + mailboxProtocol.sendKeepAlive(); + return; + } + byte[] syncStream = bufferOS.toByteArray(); + bufferOS.reset(); - MailboxRequestSync req = new MailboxRequestSync(syncStream); + MailboxRequestSync req = new MailboxRequestSync(syncStream, false); try { mailboxProtocol.writeRequest(req); } catch (InterruptedException e) { diff --git a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java index 38a061187aadb42112250d8ffb0c7170c5f4f1a4..7a5420289f917dc25da8d6467a5d1182e156adfb 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/mailbox/protocol/MailboxProtocolIntegrationTest.java @@ -37,12 +37,12 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { BdfWriterFactory bdfWriterFactory; - private BdfWriter bdfWriter; - private BdfReader bdfReader; - private PipedOutputStream pipedOS; - private PipedInputStream pipedIS; + private BdfWriter bdfWriter; + private BdfReader bdfReader; + private PipedOutputStream pipedOS; + private PipedInputStream pipedIS; - private MailboxProtocol mailboxProtocol; + private MailboxProtocol mailboxProtocol; public MailboxProtocolIntegrationTest() { @@ -77,7 +77,8 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { public void testProtocolTerminationOnClosedConnection() throws IOException, InterruptedException { ioExecutor.execute(mailboxProtocol); - MailboxRequest req = new MailboxRequestStore(new ContactId(123), "test".getBytes()); + MailboxRequest req = + new MailboxRequestStore(new ContactId(123), "test".getBytes()); mailboxProtocol.writeRequest(req); // read request to ensure writing thread is running @@ -174,13 +175,15 @@ public class MailboxProtocolIntegrationTest extends BrambleTestCase { } private void syncRequest() throws IOException, InterruptedException { - MailboxRequestSync req = new MailboxRequestSync("Test".getBytes()); + MailboxRequestSync req = + new MailboxRequestSync("Test".getBytes(), false); mailboxProtocol.writeRequest(req); MailboxRequestSync recvReq = (MailboxRequestSync) mailboxProtocol.getNextRequest(); assertEquals(req.getId(), recvReq.getId()); assertEquals(req.getType(), recvReq.getType()); assertEquals("Test", new String(recvReq.getSyncStream())); + assertEquals(false, recvReq.isEndOfStream()); }