Address review comments

parent 501980d8
......@@ -24,4 +24,5 @@ public interface RecordReader {
boolean hasRequest() throws IOException;
Request readRequest() throws IOException;
}
......@@ -17,15 +17,10 @@ public interface SyncConstants {
/**
* The maximum length of the record payload in bytes.
*/
int MAX_RECORD_PAYLOAD_LENGTH = 32 * 1024; // 32 KiB
int MAX_RECORD_PAYLOAD_LENGTH = 48 * 1024; // 48 KiB
/** The maximum length of a group descriptor in bytes. */
int MAX_GROUP_DESCRIPTOR_LENGTH = 32 * 1024; // 32 KiB
/**
* The maximum length of a message in bytes.
*/
int MAX_MESSAGE_LENGTH = MAX_RECORD_PAYLOAD_LENGTH - RECORD_HEADER_LENGTH;
int MAX_GROUP_DESCRIPTOR_LENGTH = 16 * 1024; // 16 KiB
/**
* The length of the message header in bytes.
......@@ -37,6 +32,11 @@ public interface SyncConstants {
*/
int MAX_MESSAGE_BODY_LENGTH = 32 * 1024; // 32 KiB
/**
* The maximum length of a message in bytes.
*/
int MAX_MESSAGE_LENGTH = MESSAGE_HEADER_LENGTH + MAX_MESSAGE_BODY_LENGTH;
/**
* The maximum number of message IDs in an ack, offer or request record.
*/
......
......@@ -96,37 +96,33 @@ class KeyAgreementTransport {
}
private byte[] readRecord(byte expectedType) throws AbortException {
byte[] header = readHeader();
int len = ByteUtils.readUint16(header,
RECORD_HEADER_PAYLOAD_LENGTH_OFFSET);
if (header[0] != PROTOCOL_VERSION) {
// ignore record with unknown protocol version and try next
while (true) {
byte[] header = readHeader();
int len = ByteUtils.readUint16(header,
RECORD_HEADER_PAYLOAD_LENGTH_OFFSET);
if (header[0] != PROTOCOL_VERSION) {
throw new AbortException(false);
}
byte type = header[1];
if (type == ABORT) throw new AbortException(true);
if (type != expectedType) {
if (type != KEY && type != CONFIRM) {
// ignore unrecognised record and try next
try {
readData(len);
} catch (IOException e) {
throw new AbortException(e);
}
continue;
} else {
throw new AbortException(false);
}
}
try {
readData(len);
return readData(len);
} catch (IOException e) {
throw new AbortException(e);
}
return readRecord(expectedType);
}
byte type = header[1];
if (type == ABORT) throw new AbortException(true);
if (type != expectedType) {
if (type != KEY && type != CONFIRM) {
// ignore unrecognised record and try next
try {
readData(len);
} catch (IOException e) {
throw new AbortException(e);
}
return readRecord(expectedType);
} else {
throw new AbortException(false);
}
}
try {
return readData(len);
} catch (IOException e) {
throw new AbortException(e);
}
}
......
......@@ -42,14 +42,14 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD
/**
* An outgoing {@link SyncSession} suitable for duplex transports. The session
* offers messages before sending them, keeps its output stream open when there
* are no packets to send, and reacts to events that make packets available to
* are no records to send, and reacts to events that make records available to
* send.
*/
@ThreadSafe
@NotNullByDefault
class DuplexOutgoingSession implements SyncSession, EventListener {
// Check for retransmittable packets once every 60 seconds
// Check for retransmittable records once every 60 seconds
private static final int RETX_QUERY_INTERVAL = 60 * 1000;
private static final Logger LOG =
Logger.getLogger(DuplexOutgoingSession.class.getName());
......@@ -91,7 +91,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
public void run() throws IOException {
eventBus.addListener(this);
try {
// Start a query for each type of packet
// Start a query for each type of record
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
......@@ -100,10 +100,10 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
long nextKeepalive = now + maxIdleTime;
long nextRetxQuery = now + RETX_QUERY_INTERVAL;
boolean dataToFlush = true;
// Write packets until interrupted
// Write records until interrupted
try {
while (!interrupted) {
// Work out how long we should wait for a packet
// Work out how long we should wait for a record
now = clock.currentTimeMillis();
long wait = Math.min(nextKeepalive, nextRetxQuery) - now;
if (wait < 0) wait = 0;
......@@ -113,13 +113,13 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dataToFlush = false;
nextKeepalive = now + maxIdleTime;
}
// Wait for a packet
// Wait for a record
ThrowingRunnable<IOException> task = writerTasks.poll(wait,
MILLISECONDS);
if (task == null) {
now = clock.currentTimeMillis();
if (now >= nextRetxQuery) {
// Check for retransmittable packets
// Check for retransmittable records
dbExecutor.execute(new GenerateBatch());
dbExecutor.execute(new GenerateOffer());
nextRetxQuery = now + RETX_QUERY_INTERVAL;
......@@ -139,7 +139,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
}
if (dataToFlush) recordWriter.flush();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write");
LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt();
}
} finally {
......
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.FormatException;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.event.ContactRemovedEvent;
import org.briarproject.bramble.api.db.DatabaseComponent;
......@@ -60,7 +61,7 @@ class IncomingSession implements SyncSession, EventListener {
public void run() throws IOException {
eventBus.addListener(this);
try {
// Read packets until interrupted or EOF
// Read records until interrupted or EOF
while (!interrupted && !recordReader.eof()) {
if (recordReader.hasAck()) {
Ack a = recordReader.readAck();
......@@ -74,8 +75,10 @@ class IncomingSession implements SyncSession, EventListener {
} else if (recordReader.hasRequest()) {
Request r = recordReader.readRequest();
dbExecutor.execute(new ReceiveRequest(r));
} else {
// unknown records are ignored in RecordReader#eof()
throw new FormatException();
}
// unknown records are ignored
}
} finally {
eventBus.removeListener(this);
......
package org.briarproject.bramble.sync;
import org.briarproject.bramble.api.crypto.CryptoComponent;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.sync.MessageFactory;
import org.briarproject.bramble.api.sync.RecordReader;
......@@ -15,13 +14,10 @@ import javax.inject.Inject;
@NotNullByDefault
class RecordReaderFactoryImpl implements RecordReaderFactory {
private final CryptoComponent crypto;
private final MessageFactory messageFactory;
@Inject
RecordReaderFactoryImpl(CryptoComponent crypto,
MessageFactory messageFactory) {
this.crypto = crypto;
RecordReaderFactoryImpl(MessageFactory messageFactory) {
this.messageFactory = messageFactory;
}
......
......@@ -51,32 +51,49 @@ class RecordReaderImpl implements RecordReader {
private void readRecord() throws IOException {
if (state != State.BUFFER_EMPTY) throw new IllegalStateException();
// Read the header
int offset = 0;
while (offset < RECORD_HEADER_LENGTH) {
int read = in.read(header, offset, RECORD_HEADER_LENGTH - offset);
if (read == -1) {
if (offset > 0) throw new FormatException();
state = State.EOF;
while (true) {
// Read the header
int offset = 0;
while (offset < RECORD_HEADER_LENGTH) {
int read =
in.read(header, offset, RECORD_HEADER_LENGTH - offset);
if (read == -1) {
if (offset > 0) throw new FormatException();
state = State.EOF;
return;
}
offset += read;
}
// Check the protocol version
if (header[0] != PROTOCOL_VERSION) throw new FormatException();
// Read the payload length
payloadLength = ByteUtils.readUint16(header, 2);
if (payloadLength > MAX_RECORD_PAYLOAD_LENGTH)
throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
int read = in.read(payload, offset, payloadLength - offset);
if (read == -1) throw new FormatException();
offset += read;
}
state = State.BUFFER_FULL;
// Return if this is a known record type
if (header[1] == ACK || header[1] == MESSAGE ||
header[1] == OFFER || header[1] == REQUEST) {
return;
}
offset += read;
}
// Check the protocol version
if (header[0] != PROTOCOL_VERSION) throw new FormatException();
// Read the payload length
payloadLength = ByteUtils.readUint16(header, 2);
if (payloadLength > MAX_RECORD_PAYLOAD_LENGTH) throw new FormatException();
// Read the payload
offset = 0;
while (offset < payloadLength) {
int read = in.read(payload, offset, payloadLength - offset);
if (read == -1) throw new FormatException();
offset += read;
}
state = State.BUFFER_FULL;
}
/**
* The return value indicates whether there's another record available
* or whether we've reached the end of the input stream.
* If a record is available,
* it's been read into the buffer by the time eof() returns,
* so the method that called eof() can access the record from the buffer,
* for example to check its type or extract its payload.
*/
@Override
public boolean eof() throws IOException {
if (state == State.BUFFER_EMPTY) readRecord();
......@@ -153,4 +170,5 @@ class RecordReaderImpl implements RecordReader {
if (!hasRequest()) throw new FormatException();
return new Request(readMessageIds());
}
}
......@@ -34,7 +34,7 @@ import static org.briarproject.bramble.api.sync.SyncConstants.MAX_RECORD_PAYLOAD
/**
* An outgoing {@link SyncSession} suitable for simplex transports. The session
* sends messages without offering them first, and closes its output stream
* when there are no more packets to send.
* when there are no more records to send.
*/
@ThreadSafe
@NotNullByDefault
......@@ -70,7 +70,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
this.contactId = contactId;
this.maxLatency = maxLatency;
this.recordWriter = recordWriter;
outstandingQueries = new AtomicInteger(2); // One per type of packet
outstandingQueries = new AtomicInteger(2); // One per type of record
writerTasks = new LinkedBlockingQueue<ThrowingRunnable<IOException>>();
}
......@@ -79,10 +79,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
public void run() throws IOException {
eventBus.addListener(this);
try {
// Start a query for each type of packet
// Start a query for each type of record
dbExecutor.execute(new GenerateAck());
dbExecutor.execute(new GenerateBatch());
// Write packets until interrupted or no more packets to write
// Write records until interrupted or no more records to write
try {
while (!interrupted) {
ThrowingRunnable<IOException> task = writerTasks.take();
......@@ -91,7 +91,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
}
recordWriter.flush();
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a packet to write");
LOG.info("Interrupted while waiting for a record to write");
Thread.currentThread().interrupt();
}
} finally {
......
......@@ -40,9 +40,9 @@ public class SyncModule {
}
@Provides
RecordReaderFactory provideRecordReaderFactory(CryptoComponent crypto,
MessageFactory messageFactory) {
return new RecordReaderFactoryImpl(crypto, messageFactory);
RecordReaderFactory provideRecordReaderFactory(
RecordReaderFactoryImpl recordReaderFactory) {
return recordReaderFactory;
}
@Provides
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment