Verified Commit 5320737d authored by akwizgran's avatar akwizgran

Send end of stream marker when sync session finishes.

parent 46cd7e37
......@@ -2,9 +2,9 @@ package org.briarproject.bramble.api.sync;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream;
import java.io.OutputStream;
@NotNullByDefault
public interface SyncSessionFactory {
......@@ -12,8 +12,8 @@ public interface SyncSessionFactory {
SyncSession createIncomingSession(ContactId c, InputStream in);
SyncSession createSimplexOutgoingSession(ContactId c, int maxLatency,
OutputStream out);
StreamWriter streamWriter);
SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, OutputStream out);
int maxIdleTime, StreamWriter streamWriter);
}
package org.briarproject.bramble.api.transport;
import java.io.IOException;
import java.io.OutputStream;
/**
* An interface for writing data to a transport connection. Data will be
* encrypted and authenticated before being written to the connection.
*/
public interface StreamWriter {
OutputStream getOutputStream();
/**
* Sends the end of stream marker, informing the recipient that no more
* data will be sent. The connection is flushed but not closed.
*/
void sendEndOfStream() throws IOException;
}
......@@ -12,12 +12,12 @@ public interface StreamWriterFactory {
* Creates an {@link OutputStream OutputStream} for writing to a
* transport stream
*/
OutputStream createStreamWriter(OutputStream out, StreamContext ctx);
StreamWriter createStreamWriter(OutputStream out, StreamContext ctx);
/**
* Creates an {@link OutputStream OutputStream} for writing to a contact
* exchange stream.
*/
OutputStream createContactExchangeStreamWriter(OutputStream out,
StreamWriter createContactExchangeStreamWriter(OutputStream out,
SecretKey headerKey);
}
......@@ -30,6 +30,7 @@ import org.briarproject.bramble.api.record.RecordWriter;
import org.briarproject.bramble.api.record.RecordWriterFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.io.EOFException;
......@@ -152,11 +153,11 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
recordReaderFactory.createRecordReader(streamReader);
// Create the writers
OutputStream streamWriter =
StreamWriter streamWriter =
streamWriterFactory.createContactExchangeStreamWriter(out,
alice ? aliceHeaderKey : bobHeaderKey);
RecordWriter recordWriter =
recordWriterFactory.createRecordWriter(streamWriter);
recordWriterFactory.createRecordWriter(streamWriter.getOutputStream());
// Derive the nonces to be signed
byte[] aliceNonce = crypto.mac(ALICE_NONCE_LABEL, masterSecret,
......@@ -184,8 +185,8 @@ class ContactExchangeTaskImpl extends Thread implements ContactExchangeTask {
localSignature, localTimestamp);
recordWriter.flush();
}
// Close the outgoing stream
recordWriter.close();
// Send EOF on the outgoing stream
streamWriter.sendEndOfStream();
// Skip any remaining records from the incoming stream
try {
while (true) recordReader.readRecord();
......
......@@ -14,12 +14,12 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
......@@ -101,7 +101,7 @@ class ConnectionManagerImpl implements ConnectionManager {
private SyncSession createSimplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w) throws IOException {
OutputStream streamWriter = streamWriterFactory.createStreamWriter(
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
return syncSessionFactory.createSimplexOutgoingSession(
ctx.getContactId(), w.getMaxLatency(), streamWriter);
......@@ -109,7 +109,7 @@ class ConnectionManagerImpl implements ConnectionManager {
private SyncSession createDuplexOutgoingSession(StreamContext ctx,
TransportConnectionWriter w) throws IOException {
OutputStream streamWriter = streamWriterFactory.createStreamWriter(
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(
w.getOutputStream(), ctx);
return syncSessionFactory.createDuplexOutgoingSession(
ctx.getContactId(), w.getMaxLatency(), w.getMaxIdleTime(),
......
......@@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.event.MessageSharedEvent;
import org.briarproject.bramble.api.sync.event.MessageToAckEvent;
import org.briarproject.bramble.api.sync.event.MessageToRequestEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
import java.util.Collection;
......@@ -67,6 +68,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
private final Clock clock;
private final ContactId contactId;
private final int maxLatency, maxIdleTime;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
......@@ -81,7 +83,8 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
DuplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, Clock clock, ContactId contactId, int maxLatency,
int maxIdleTime, SyncRecordWriter recordWriter) {
int maxIdleTime, StreamWriter streamWriter,
SyncRecordWriter recordWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
......@@ -89,6 +92,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
this.contactId = contactId;
this.maxLatency = maxLatency;
this.maxIdleTime = maxIdleTime;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
writerTasks = new LinkedBlockingQueue<>();
}
......@@ -149,7 +153,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dataToFlush = true;
}
}
if (dataToFlush) recordWriter.flush();
streamWriter.sendEndOfStream();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt();
......
......@@ -63,7 +63,11 @@ class IncomingSession implements SyncSession, EventListener {
eventBus.addListener(this);
try {
// Read records until interrupted or EOF
while (!interrupted && !recordReader.eof()) {
while (!interrupted) {
if (recordReader.eof()) {
LOG.info("End of stream");
return;
}
if (recordReader.hasAck()) {
Ack a = recordReader.readAck();
dbExecutor.execute(new ReceiveAck(a));
......
......@@ -15,6 +15,7 @@ import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
import java.util.Collection;
......@@ -51,6 +52,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private final EventBus eventBus;
private final ContactId contactId;
private final int maxLatency;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
private final AtomicInteger outstandingQueries;
private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
......@@ -58,13 +60,14 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
private volatile boolean interrupted = false;
SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
EventBus eventBus, ContactId contactId,
int maxLatency, SyncRecordWriter recordWriter) {
EventBus eventBus, ContactId contactId, int maxLatency,
StreamWriter streamWriter, SyncRecordWriter recordWriter) {
this.db = db;
this.dbExecutor = dbExecutor;
this.eventBus = eventBus;
this.contactId = contactId;
this.maxLatency = maxLatency;
this.streamWriter = streamWriter;
this.recordWriter = recordWriter;
outstandingQueries = new AtomicInteger(2); // One per type of record
writerTasks = new LinkedBlockingQueue<>();
......@@ -85,7 +88,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
if (task == CLOSE) break;
task.run();
}
recordWriter.flush();
streamWriter.sendEndOfStream();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt();
......
......@@ -12,6 +12,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriterFactory;
import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.InputStream;
import java.io.OutputStream;
......@@ -53,19 +54,21 @@ class SyncSessionFactoryImpl implements SyncSessionFactory {
@Override
public SyncSession createSimplexOutgoingSession(ContactId c,
int maxLatency, OutputStream out) {
int maxLatency, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new SimplexOutgoingSession(db, dbExecutor, eventBus, c,
maxLatency, recordWriter);
maxLatency, streamWriter, recordWriter);
}
@Override
public SyncSession createDuplexOutgoingSession(ContactId c, int maxLatency,
int maxIdleTime, OutputStream out) {
int maxIdleTime, StreamWriter streamWriter) {
OutputStream out = streamWriter.getOutputStream();
SyncRecordWriter recordWriter =
recordWriterFactory.createRecordWriter(out);
return new DuplexOutgoingSession(db, dbExecutor, eventBus, clock, c,
maxLatency, maxIdleTime, recordWriter);
maxLatency, maxIdleTime, streamWriter, recordWriter);
}
}
......@@ -4,6 +4,7 @@ import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.crypto.StreamEncrypterFactory;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import java.io.OutputStream;
......@@ -23,14 +24,14 @@ class StreamWriterFactoryImpl implements StreamWriterFactory {
}
@Override
public OutputStream createStreamWriter(OutputStream out,
public StreamWriter createStreamWriter(OutputStream out,
StreamContext ctx) {
return new StreamWriterImpl(
streamEncrypterFactory.createStreamEncrypter(out, ctx));
}
@Override
public OutputStream createContactExchangeStreamWriter(OutputStream out,
public StreamWriter createContactExchangeStreamWriter(OutputStream out,
SecretKey headerKey) {
return new StreamWriterImpl(
streamEncrypterFactory.createContactExchangeStreamDecrypter(out,
......
......@@ -2,6 +2,7 @@ package org.briarproject.bramble.transport;
import org.briarproject.bramble.api.crypto.StreamEncrypter;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
import java.io.OutputStream;
......@@ -17,7 +18,7 @@ import static org.briarproject.bramble.api.transport.TransportConstants.MAX_PAYL
*/
@NotThreadSafe
@NotNullByDefault
class StreamWriterImpl extends OutputStream {
class StreamWriterImpl extends OutputStream implements StreamWriter {
private final StreamEncrypter encrypter;
private final byte[] payload;
......@@ -29,6 +30,17 @@ class StreamWriterImpl extends OutputStream {
payload = new byte[MAX_PAYLOAD_LENGTH];
}
@Override
public OutputStream getOutputStream() {
return this;
}
@Override
public void sendEndOfStream() throws IOException {
writeFrame(true);
encrypter.flush();
}
@Override
public void close() throws IOException {
writeFrame(true);
......
......@@ -7,6 +7,7 @@ import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.sync.Ack;
import org.briarproject.bramble.api.sync.MessageId;
import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.TestUtils;
......@@ -20,6 +21,7 @@ import java.util.concurrent.Executor;
import static org.briarproject.bramble.api.sync.SyncConstants.MAX_MESSAGE_IDS;
// TODO: Convert to BrambleMockTestCase
public class SimplexOutgoingSessionTest extends BrambleTestCase {
private final Mockery context;
......@@ -29,6 +31,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
private final ContactId contactId;
private final MessageId messageId;
private final int maxLatency;
private final StreamWriter streamWriter;
private final SyncRecordWriter recordWriter;
public SimplexOutgoingSessionTest() {
......@@ -36,6 +39,7 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
db = context.mock(DatabaseComponent.class);
dbExecutor = new ImmediateExecutor();
eventBus = context.mock(EventBus.class);
streamWriter = context.mock(StreamWriter.class);
recordWriter = context.mock(SyncRecordWriter.class);
contactId = new ContactId(234);
messageId = new MessageId(TestUtils.getRandomId());
......@@ -45,7 +49,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
@Test
public void testNothingToSend() throws Exception {
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, maxLatency, recordWriter);
dbExecutor, eventBus, contactId, maxLatency, streamWriter,
recordWriter);
Transaction noAckTxn = new Transaction(null, false);
Transaction noMsgTxn = new Transaction(null, false);
......@@ -67,8 +72,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
will(returnValue(null));
oneOf(db).commitTransaction(noMsgTxn);
oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream
oneOf(recordWriter).flush();
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
......@@ -83,7 +88,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
Ack ack = new Ack(Collections.singletonList(messageId));
byte[] raw = new byte[1234];
SimplexOutgoingSession session = new SimplexOutgoingSession(db,
dbExecutor, eventBus, contactId, maxLatency, recordWriter);
dbExecutor, eventBus, contactId, maxLatency, streamWriter,
recordWriter);
Transaction ackTxn = new Transaction(null, false);
Transaction noAckTxn = new Transaction(null, false);
Transaction msgTxn = new Transaction(null, false);
......@@ -124,8 +130,8 @@ public class SimplexOutgoingSessionTest extends BrambleTestCase {
will(returnValue(null));
oneOf(db).commitTransaction(noMsgTxn);
oneOf(db).endTransaction(noMsgTxn);
// Flush the output stream
oneOf(recordWriter).flush();
// Send the end of stream marker
oneOf(streamWriter).sendEndOfStream();
// Remove listener
oneOf(eventBus).removeListener(session);
}});
......
......@@ -19,6 +19,7 @@ import org.briarproject.bramble.api.sync.SyncRecordWriter;
import org.briarproject.bramble.api.sync.SyncRecordWriterFactory;
import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.TestUtils;
......@@ -27,7 +28,6 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
......@@ -102,18 +102,18 @@ public class SyncIntegrationTest extends BrambleTestCase {
ByteArrayOutputStream out = new ByteArrayOutputStream();
StreamContext ctx = new StreamContext(contactId, transportId, tagKey,
headerKey, streamNumber);
OutputStream streamWriter = streamWriterFactory.createStreamWriter(out,
StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out,
ctx);
SyncRecordWriter recordWriter = recordWriterFactory.createRecordWriter(
streamWriter);
streamWriter.getOutputStream());
recordWriter.writeAck(new Ack(messageIds));
recordWriter.writeMessage(message.getRaw());
recordWriter.writeMessage(message1.getRaw());
recordWriter.writeOffer(new Offer(messageIds));
recordWriter.writeRequest(new Request(messageIds));
recordWriter.flush();
streamWriter.sendEndOfStream();
return out.toByteArray();
}
......
......@@ -16,6 +16,7 @@ import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.transport.KeyManager;
import org.briarproject.bramble.api.transport.StreamContext;
import org.briarproject.bramble.api.transport.StreamReaderFactory;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.api.transport.StreamWriterFactory;
import org.briarproject.bramble.contact.ContactModule;
import org.briarproject.bramble.identity.IdentityModule;
......@@ -39,7 +40,6 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import static org.briarproject.bramble.api.transport.TransportConstants.TAG_LENGTH;
import static org.briarproject.bramble.test.TestPluginConfigModule.MAX_LATENCY;
......@@ -69,7 +69,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
alice = DaggerSimplexMessagingIntegrationTestComponent.builder()
.testDatabaseModule(new TestDatabaseModule(aliceDir)).build();
injectEagerSingletons(alice);
alice.inject(new SystemModule.EagerSingletons());
bob = DaggerSimplexMessagingIntegrationTestComponent.builder()
.testDatabaseModule(new TestDatabaseModule(bobDir)).build();
injectEagerSingletons(bob);
......@@ -159,7 +158,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
// Create a stream writer
StreamWriterFactory streamWriterFactory =
device.getStreamWriterFactory();
OutputStream streamWriter =
StreamWriter streamWriter =
streamWriterFactory.createStreamWriter(out, ctx);
// Create an outgoing sync session
SyncSessionFactory syncSessionFactory = device.getSyncSessionFactory();
......@@ -167,7 +166,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase {
contactId, MAX_LATENCY, streamWriter);
// Write whatever needs to be written
session.run();
streamWriter.close();
streamWriter.sendEndOfStream();
// Return the contents of the stream
return out.toByteArray();
}
......
......@@ -23,6 +23,7 @@ import org.briarproject.bramble.api.sync.SyncSession;
import org.briarproject.bramble.api.sync.SyncSessionFactory;
import org.briarproject.bramble.api.sync.event.MessageStateChangedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.transport.StreamWriter;
import org.briarproject.bramble.contact.ContactModule;
import org.briarproject.bramble.crypto.CryptoExecutorModule;
import org.briarproject.bramble.identity.IdentityModule;
......@@ -340,9 +341,10 @@ public abstract class BriarIntegrationTest<C extends BriarIntegrationTestCompone
LOG.info("TEST: Sending message from " + from + " to " + to);
ByteArrayOutputStream out = new ByteArrayOutputStream();
StreamWriter streamWriter = new TestStreamWriter(out);
// Create an outgoing sync session
SyncSession sessionFrom =
fromSync.createSimplexOutgoingSession(toId, MAX_LATENCY, out);
SyncSession sessionFrom = fromSync.createSimplexOutgoingSession(toId,
MAX_LATENCY, streamWriter);
// Write whatever needs to be written
sessionFrom.run();
out.close();
......
package org.briarproject.briar.test;
import org.briarproject.bramble.api.transport.StreamWriter;
import java.io.IOException;
import java.io.OutputStream;
class TestStreamWriter implements StreamWriter {
private final OutputStream out;
TestStreamWriter(OutputStream out) {
this.out = out;
}
@Override
public OutputStream getOutputStream() {
return out;
}
@Override
public void sendEndOfStream() throws IOException {
out.flush();
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment