From 5345db0b6ba0fd57703368a74ddac99224e6daa3 Mon Sep 17 00:00:00 2001 From: Torsten Grote <t@grobox.de> Date: Fri, 16 Dec 2016 10:15:59 -0200 Subject: [PATCH] Address review comments --- .../bramble/api/sync/RecordReader.java | 1 + .../bramble/api/sync/SyncConstants.java | 14 ++--- .../keyagreement/KeyAgreementTransport.java | 50 +++++++--------- .../bramble/sync/DuplexOutgoingSession.java | 16 ++--- .../bramble/sync/IncomingSession.java | 7 ++- .../bramble/sync/RecordReaderFactoryImpl.java | 6 +- .../bramble/sync/RecordReaderImpl.java | 60 ++++++++++++------- .../bramble/sync/SimplexOutgoingSession.java | 10 ++-- .../briarproject/bramble/sync/SyncModule.java | 6 +- 9 files changed, 92 insertions(+), 78 deletions(-) diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordReader.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordReader.java index 25eadaaea1..f291690859 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordReader.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/RecordReader.java @@ -24,4 +24,5 @@ public interface RecordReader { boolean hasRequest() throws IOException; Request readRequest() throws IOException; + } diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java index e1df2a7531..80f48da666 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncConstants.java @@ -17,15 +17,10 @@ public interface SyncConstants { /** * The maximum length of the record payload in bytes. */ - int MAX_RECORD_PAYLOAD_LENGTH = 32 * 1024; // 32 KiB + int MAX_RECORD_PAYLOAD_LENGTH = 48 * 1024; // 48 KiB /** The maximum length of a group descriptor in bytes. */ - int MAX_GROUP_DESCRIPTOR_LENGTH = 32 * 1024; // 32 KiB - - /** - * The maximum length of a message in bytes. - */ - int MAX_MESSAGE_LENGTH = MAX_RECORD_PAYLOAD_LENGTH - RECORD_HEADER_LENGTH; + int MAX_GROUP_DESCRIPTOR_LENGTH = 16 * 1024; // 16 KiB /** * The length of the message header in bytes. @@ -37,6 +32,11 @@ public interface SyncConstants { */ int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB + /** + * The maximum length of a message in bytes. + */ + int MAX_MESSAGE_LENGTH = MESSAGE_HEADER_LENGTH + MAX_MESSAGE_BODY_LENGTH; + /** * The maximum number of message IDs in an ack, offer or request record. */ diff --git a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTransport.java b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTransport.java index 8fa8adcda8..d735145ac2 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTransport.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/keyagreement/KeyAgreementTransport.java @@ -96,37 +96,33 @@ class KeyAgreementTransport { } private byte[] readRecord(byte expectedType) throws AbortException { - byte[] header = readHeader(); - int len = ByteUtils.readUint16(header, - RECORD_HEADER_PAYLOAD_LENGTH_OFFSET); - if (header[0] != PROTOCOL_VERSION) { - // ignore record with unknown protocol version and try next + while (true) { + byte[] header = readHeader(); + int len = ByteUtils.readUint16(header, + RECORD_HEADER_PAYLOAD_LENGTH_OFFSET); + if (header[0] != PROTOCOL_VERSION) { + throw new AbortException(false); + } + byte type = header[1]; + if (type == ABORT) throw new AbortException(true); + if (type != expectedType) { + if (type != KEY && type != CONFIRM) { + // ignore unrecognised record and try next + try { + readData(len); + } catch (IOException e) { + throw new AbortException(e); + } + continue; + } else { + throw new AbortException(false); + } + } try { - readData(len); + return readData(len); } catch (IOException e) { throw new AbortException(e); } - return readRecord(expectedType); - } - byte type = header[1]; - if (type == ABORT) throw new AbortException(true); - if (type != expectedType) { - if (type != KEY && type != CONFIRM) { - // ignore unrecognised record and try next - try { - readData(len); - } catch (IOException e) { - throw new AbortException(e); - } - return readRecord(expectedType); - } else { - throw new AbortException(false); - } - } - try { - return readData(len); - } catch (IOException e) { - throw new AbortException(e); } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index bd6fb0f1a7..7be248c5d6 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -42,14 +42,14 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD /** * An outgoing {@link SyncSession} suitable for duplex transports. The session * offers messages before sending them, keeps its output stream open when there - * are no packets to send, and reacts to events that make packets available to + * are no records to send, and reacts to events that make records available to * send. */ @ThreadSafe @NotNullByDefault class DuplexOutgoingSession implements SyncSession, EventListener { - // Check for retransmittable packets once every 60 seconds + // Check for retransmittable records once every 60 seconds private static final int RETX_QUERY_INTERVAL = 60 * 1000; private static final Logger LOG = Logger.getLogger(DuplexOutgoingSession.class.getName()); @@ -91,7 +91,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { public void run() throws IOException { eventBus.addListener(this); try { - // Start a query for each type of packet + // Start a query for each type of record dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateOffer()); @@ -100,10 +100,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener { long nextKeepalive = now + maxIdleTime; long nextRetxQuery = now + RETX_QUERY_INTERVAL; boolean dataToFlush = true; - // Write packets until interrupted + // Write records until interrupted try { while (!interrupted) { - // Work out how long we should wait for a packet + // Work out how long we should wait for a record now = clock.currentTimeMillis(); long wait = Math.min(nextKeepalive, nextRetxQuery) - now; if (wait < 0) wait = 0; @@ -113,13 +113,13 @@ class DuplexOutgoingSession implements SyncSession, EventListener { dataToFlush = false; nextKeepalive = now + maxIdleTime; } - // Wait for a packet + // Wait for a record ThrowingRunnable<IOException> task = writerTasks.poll(wait, MILLISECONDS); if (task == null) { now = clock.currentTimeMillis(); if (now >= nextRetxQuery) { - // Check for retransmittable packets + // Check for retransmittable records dbExecutor.execute(new GenerateBatch()); dbExecutor.execute(new GenerateOffer()); nextRetxQuery = now + RETX_QUERY_INTERVAL; @@ -139,7 +139,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { } if (dataToFlush) recordWriter.flush(); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for a packet to write"); + LOG.info("Interrupted while waiting for a record to write"); Thread.currentThread().interrupt(); } } finally { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java index e099664678..249d1f2c6b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java @@ -1,5 +1,6 @@ package org.briarproject.bramble.sync; +import org.briarproject.bramble.api.FormatException; import org.briarproject.bramble.api.contact.ContactId; import org.briarproject.bramble.api.contact.event.ContactRemovedEvent; import org.briarproject.bramble.api.db.DatabaseComponent; @@ -60,7 +61,7 @@ class IncomingSession implements SyncSession, EventListener { public void run() throws IOException { eventBus.addListener(this); try { - // Read packets until interrupted or EOF + // Read records until interrupted or EOF while (!interrupted && !recordReader.eof()) { if (recordReader.hasAck()) { Ack a = recordReader.readAck(); @@ -74,8 +75,10 @@ class IncomingSession implements SyncSession, EventListener { } else if (recordReader.hasRequest()) { Request r = recordReader.readRequest(); dbExecutor.execute(new ReceiveRequest(r)); + } else { + // unknown records are ignored in RecordReader#eof() + throw new FormatException(); } - // unknown records are ignored } } finally { eventBus.removeListener(this); diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderFactoryImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderFactoryImpl.java index 43c1d26233..fb43f9459c 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderFactoryImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderFactoryImpl.java @@ -1,6 +1,5 @@ package org.briarproject.bramble.sync; -import org.briarproject.bramble.api.crypto.CryptoComponent; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.MessageFactory; import org.briarproject.bramble.api.sync.RecordReader; @@ -15,13 +14,10 @@ import javax.inject.Inject; @NotNullByDefault class RecordReaderFactoryImpl implements RecordReaderFactory { - private final CryptoComponent crypto; private final MessageFactory messageFactory; @Inject - RecordReaderFactoryImpl(CryptoComponent crypto, - MessageFactory messageFactory) { - this.crypto = crypto; + RecordReaderFactoryImpl(MessageFactory messageFactory) { this.messageFactory = messageFactory; } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderImpl.java index 03e67e26d1..5fdb18b1bf 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/RecordReaderImpl.java @@ -51,32 +51,49 @@ class RecordReaderImpl implements RecordReader { private void readRecord() throws IOException { if (state != State.BUFFER_EMPTY) throw new IllegalStateException(); - // Read the header - int offset = 0; - while (offset < RECORD_HEADER_LENGTH) { - int read = in.read(header, offset, RECORD_HEADER_LENGTH - offset); - if (read == -1) { - if (offset > 0) throw new FormatException(); - state = State.EOF; + while (true) { + // Read the header + int offset = 0; + while (offset < RECORD_HEADER_LENGTH) { + int read = + in.read(header, offset, RECORD_HEADER_LENGTH - offset); + if (read == -1) { + if (offset > 0) throw new FormatException(); + state = State.EOF; + return; + } + offset += read; + } + // Check the protocol version + if (header[0] != PROTOCOL_VERSION) throw new FormatException(); + // Read the payload length + payloadLength = ByteUtils.readUint16(header, 2); + if (payloadLength > MAX_RECORD_PAYLOAD_LENGTH) + throw new FormatException(); + // Read the payload + offset = 0; + while (offset < payloadLength) { + int read = in.read(payload, offset, payloadLength - offset); + if (read == -1) throw new FormatException(); + offset += read; + } + state = State.BUFFER_FULL; + // Return if this is a known record type + if (header[1] == ACK || header[1] == MESSAGE || + header[1] == OFFER || header[1] == REQUEST) { return; } - offset += read; } - // Check the protocol version - if (header[0] != PROTOCOL_VERSION) throw new FormatException(); - // Read the payload length - payloadLength = ByteUtils.readUint16(header, 2); - if (payloadLength > MAX_RECORD_PAYLOAD_LENGTH) throw new FormatException(); - // Read the payload - offset = 0; - while (offset < payloadLength) { - int read = in.read(payload, offset, payloadLength - offset); - if (read == -1) throw new FormatException(); - offset += read; - } - state = State.BUFFER_FULL; } + /** + * The return value indicates whether there's another record available + * or whether we've reached the end of the input stream. + * If a record is available, + * it's been read into the buffer by the time eof() returns, + * so the method that called eof() can access the record from the buffer, + * for example to check its type or extract its payload. + */ @Override public boolean eof() throws IOException { if (state == State.BUFFER_EMPTY) readRecord(); @@ -153,4 +170,5 @@ class RecordReaderImpl implements RecordReader { if (!hasRequest()) throw new FormatException(); return new Request(readMessageIds()); } + } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 0af9eea38b..a9246b9748 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -34,7 +34,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD /** * An outgoing {@link SyncSession} suitable for simplex transports. The session * sends messages without offering them first, and closes its output stream - * when there are no more packets to send. + * when there are no more records to send. */ @ThreadSafe @NotNullByDefault @@ -70,7 +70,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.contactId = contactId; this.maxLatency = maxLatency; this.recordWriter = recordWriter; - outstandingQueries = new AtomicInteger(2); // One per type of packet + outstandingQueries = new AtomicInteger(2); // One per type of record writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>(); } @@ -79,10 +79,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener { public void run() throws IOException { eventBus.addListener(this); try { - // Start a query for each type of packet + // Start a query for each type of record dbExecutor.execute(new GenerateAck()); dbExecutor.execute(new GenerateBatch()); - // Write packets until interrupted or no more packets to write + // Write records until interrupted or no more records to write try { while (!interrupted) { ThrowingRunnable<IOException> task = writerTasks.take(); @@ -91,7 +91,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } recordWriter.flush(); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for a packet to write"); + LOG.info("Interrupted while waiting for a record to write"); Thread.currentThread().interrupt(); } } finally { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java index bb28a18eb8..09aaa199b1 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SyncModule.java @@ -40,9 +40,9 @@ public class SyncModule { } @Provides - RecordReaderFactory provideRecordReaderFactory(CryptoComponent crypto, - MessageFactory messageFactory) { - return new RecordReaderFactoryImpl(crypto, messageFactory); + RecordReaderFactory provideRecordReaderFactory( + RecordReaderFactoryImpl recordReaderFactory) { + return recordReaderFactory; } @Provides -- GitLab