From a0c015352de2312ccf6a79453e83e06806aac301 Mon Sep 17 00:00:00 2001 From: akwizgran <michael@briarproject.org> Date: Mon, 26 Nov 2012 23:54:17 +0000 Subject: [PATCH] Reliability layer for the modem plugin (untested). --- src/net/sf/briar/plugins/modem/Ack.java | 26 +++ src/net/sf/briar/plugins/modem/Crc32.java | 28 +++ src/net/sf/briar/plugins/modem/Data.java | 27 +++ src/net/sf/briar/plugins/modem/Frame.java | 59 +++++ src/net/sf/briar/plugins/modem/ModemImpl.java | 138 ++--------- .../sf/briar/plugins/modem/ReadHandler.java | 8 + src/net/sf/briar/plugins/modem/Receiver.java | 138 +++++++++++ .../plugins/modem/ReceiverInputStream.java | 59 +++++ .../briar/plugins/modem/ReliabilityLayer.java | 52 +++++ src/net/sf/briar/plugins/modem/Sender.java | 221 ++++++++++++++++++ .../plugins/modem/SenderOutputStream.java | 78 +++++++ .../sf/briar/plugins/modem/SlipDecoder.java | 81 +++++++ .../sf/briar/plugins/modem/SlipEncoder.java | 39 ++++ .../sf/briar/plugins/modem/WriteHandler.java | 8 + src/net/sf/briar/util/ByteUtils.java | 20 ++ 15 files changed, 866 insertions(+), 116 deletions(-) create mode 100644 src/net/sf/briar/plugins/modem/Ack.java create mode 100644 src/net/sf/briar/plugins/modem/Crc32.java create mode 100644 src/net/sf/briar/plugins/modem/Data.java create mode 100644 src/net/sf/briar/plugins/modem/Frame.java create mode 100644 src/net/sf/briar/plugins/modem/ReadHandler.java create mode 100644 src/net/sf/briar/plugins/modem/Receiver.java create mode 100644 src/net/sf/briar/plugins/modem/ReceiverInputStream.java create mode 100644 src/net/sf/briar/plugins/modem/ReliabilityLayer.java create mode 100644 src/net/sf/briar/plugins/modem/Sender.java create mode 100644 src/net/sf/briar/plugins/modem/SenderOutputStream.java create mode 100644 src/net/sf/briar/plugins/modem/SlipDecoder.java create mode 100644 src/net/sf/briar/plugins/modem/SlipEncoder.java create mode 100644 src/net/sf/briar/plugins/modem/WriteHandler.java diff --git a/src/net/sf/briar/plugins/modem/Ack.java b/src/net/sf/briar/plugins/modem/Ack.java new file mode 100644 index 0000000000..436d335052 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/Ack.java @@ -0,0 +1,26 @@ +package net.sf.briar.plugins.modem; + +import net.sf.briar.util.ByteUtils; + +class Ack extends Frame { + + static final int LENGTH = 12; + + Ack() { + super(new byte[LENGTH], LENGTH); + b[0] = (byte) Frame.ACK_FLAG; + } + + Ack(byte[] b) { + super(b, LENGTH); + b[0] = (byte) Frame.ACK_FLAG; + } + + int getWindowSize() { + return ByteUtils.readUint24(b, 5); + } + + void setWindowSize(int windowSize) { + ByteUtils.writeUint24(windowSize, b, 5); + } +} diff --git a/src/net/sf/briar/plugins/modem/Crc32.java b/src/net/sf/briar/plugins/modem/Crc32.java new file mode 100644 index 0000000000..9cd9d2d81c --- /dev/null +++ b/src/net/sf/briar/plugins/modem/Crc32.java @@ -0,0 +1,28 @@ +package net.sf.briar.plugins.modem; + +class Crc32 { + + private static final long[] TABLE = new long[256]; + + static { + for(int i = 0; i < 256; i++) { + long c = i; + for(int j = 0; j < 8; j++) { + if((c & 1) != 0) c = 0xedb88320L ^ (c >> 1); + else c >>= 1; + } + TABLE[i] = c; + } + } + + static long update(long c, byte[] b, int off, int len) { + for(int i = off; i < off + len; i++) { + c = TABLE[(int) ((c ^ b[i]) & 0xff)] ^ (c >> 8); + } + return c; + } + + static long crc(byte[] b, int off, int len) { + return update(0xffffffffL, b, off, len) ^ 0xffffffffL; + } +} diff --git a/src/net/sf/briar/plugins/modem/Data.java b/src/net/sf/briar/plugins/modem/Data.java new file mode 100644 index 0000000000..58d42d4622 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/Data.java @@ -0,0 +1,27 @@ +package net.sf.briar.plugins.modem; + +class Data extends Frame { + + static final int HEADER_LENGTH = 5, FOOTER_LENGTH = 4; + static final int MIN_LENGTH = HEADER_LENGTH + FOOTER_LENGTH; + static final int MAX_PAYLOAD_LENGTH = 1024; + static final int MAX_LENGTH = MIN_LENGTH + MAX_PAYLOAD_LENGTH; + + Data(byte[] b, int length) { + super(b, length); + if(length < MIN_LENGTH || length > MAX_LENGTH) + throw new IllegalArgumentException(); + } + + boolean isLastFrame() { + return b[0] == Frame.FIN_FLAG; + } + + void setLastFrame(boolean lastFrame) { + if(lastFrame) b[0] = (byte) Frame.FIN_FLAG; + } + + int getPayloadLength() { + return length - MIN_LENGTH; + } +} diff --git a/src/net/sf/briar/plugins/modem/Frame.java b/src/net/sf/briar/plugins/modem/Frame.java new file mode 100644 index 0000000000..aa7e20432c --- /dev/null +++ b/src/net/sf/briar/plugins/modem/Frame.java @@ -0,0 +1,59 @@ +package net.sf.briar.plugins.modem; + +import net.sf.briar.util.ByteUtils; + +abstract class Frame { + + static final byte ACK_FLAG = (byte) 128, FIN_FLAG = 64; + + protected final byte[] b; + protected final int length; + + Frame(byte[] b, int length) { + this.b = b; + this.length = length; + } + + byte[] getBuffer() { + return b; + } + + int getLength() { + return length; + } + + long getChecksum() { + return ByteUtils.readUint32(b, length - 4); + } + + void setChecksum(long checksum) { + ByteUtils.writeUint32(checksum, b, length - 4); + } + + long calculateChecksum() { + return Crc32.crc(b, 0, length - 4); + } + + long getSequenceNumber() { + return ByteUtils.readUint32(b, 1); + } + + void setSequenceNumber(long sequenceNumber) { + ByteUtils.writeUint32(sequenceNumber, b, 1); + } + + @Override + public int hashCode() { + return (int) getSequenceNumber(); + } + + @Override + public boolean equals(Object o) { + if(o instanceof Frame) { + Frame f = (Frame) o; + if(b[0] != f.b[0]) return false; + return getSequenceNumber() == f.getSequenceNumber(); + } + return false; + } +} diff --git a/src/net/sf/briar/plugins/modem/ModemImpl.java b/src/net/sf/briar/plugins/modem/ModemImpl.java index 0a2a5a1041..e913583132 100644 --- a/src/net/sf/briar/plugins/modem/ModemImpl.java +++ b/src/net/sf/briar/plugins/modem/ModemImpl.java @@ -8,9 +8,7 @@ import static jssc.SerialPort.PURGE_TXCLEAR; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -20,7 +18,7 @@ import jssc.SerialPortEvent; import jssc.SerialPortEventListener; import jssc.SerialPortException; -class ModemImpl implements Modem, SerialPortEventListener { +class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { private static final Logger LOG = Logger.getLogger(ModemImpl.class.getName()); @@ -40,8 +38,7 @@ class ModemImpl implements Modem, SerialPortEventListener { private int lineLen = 0; - private volatile ModemInputStream inputStream = null; - private volatile ModemOutputStream outputStream = null; + private volatile ReliabilityLayer reliabilityLayer = null; ModemImpl(Executor executor, Callback callback, String portName) { this.executor = executor; @@ -80,8 +77,6 @@ class ModemImpl implements Modem, SerialPortEventListener { if(!initialised.get()) initialised.wait(OK_TIMEOUT); } } catch(InterruptedException e) { - if(LOG.isLoggable(WARNING)) - LOG.warning("Interrupted while initialising modem"); tryToClose(port); Thread.currentThread().interrupt(); throw new IOException("Interrupted while initialising modem"); @@ -108,8 +103,6 @@ class ModemImpl implements Modem, SerialPortEventListener { if(!connected.get()) connected.wait(CONNECT_TIMEOUT); } } catch(InterruptedException e) { - if(LOG.isLoggable(WARNING)) - LOG.warning("Interrupted while connecting outgoing call"); tryToClose(port); Thread.currentThread().interrupt(); throw new IOException("Interrupted while connecting outgoing call"); @@ -120,11 +113,11 @@ class ModemImpl implements Modem, SerialPortEventListener { } public InputStream getInputStream() { - return inputStream; + return reliabilityLayer.getInputStream(); } public OutputStream getOutputStream() { - return outputStream; + return reliabilityLayer.getOutputStream(); } public void hangUp() throws IOException { @@ -135,18 +128,30 @@ class ModemImpl implements Modem, SerialPortEventListener { tryToClose(port); throw new IOException(e.toString()); } - inputStream.closed = true; - inputStream.received.add(new byte[0]); // Poison pill - outputStream.closed = true; + reliabilityLayer.invalidate(); connected.set(false); offHook.release(); } + public void handleWrite(byte[] b, int length) throws IOException { + if(length < b.length) { + byte[] copy = new byte[length]; + System.arraycopy(b, 0, copy, 0, length); + b = copy; + } + try { + port.writeBytes(b); + } catch(SerialPortException e) { + tryToClose(port); + throw new IOException(e.toString()); + } + } + public void serialEvent(SerialPortEvent ev) { try { if(ev.isRXCHAR()) { byte[] b = port.readBytes(); - if(connected.get()) inputStream.received.add(b); + if(connected.get()) reliabilityLayer.handleRead(b, b.length); else handleText(b); } else if(ev.isDSR() && ev.getEventValue() == 0) { if(LOG.isLoggable(INFO)) LOG.info("Remote end hung up"); @@ -171,8 +176,7 @@ class ModemImpl implements Modem, SerialPortEventListener { lineLen = 0; if(LOG.isLoggable(INFO)) LOG.info("Modem status: " + s); if(s.startsWith("CONNECT")) { - inputStream = new ModemInputStream(); - outputStream = new ModemOutputStream(); + reliabilityLayer = new ReliabilityLayer(this); synchronized(connected) { if(connected.getAndSet(true)) throw new IOException("Connected twice"); @@ -183,7 +187,7 @@ class ModemImpl implements Modem, SerialPortEventListener { if(off < b.length) { byte[] data = new byte[b.length - off]; System.arraycopy(b, off, data, 0, data.length); - inputStream.received.add(data); + reliabilityLayer.handleRead(data, data.length); } return; } else if(s.equals("OK")) { @@ -228,8 +232,6 @@ class ModemImpl implements Modem, SerialPortEventListener { if(!connected.get()) connected.wait(CONNECT_TIMEOUT); } } catch(InterruptedException e) { - if(LOG.isLoggable(WARNING)) - LOG.warning("Interrupted while connecting incoming call"); tryToClose(port); Thread.currentThread().interrupt(); throw new IOException("Interrupted while connecting incoming call"); @@ -245,100 +247,4 @@ class ModemImpl implements Modem, SerialPortEventListener { if(LOG.isLoggable(WARNING)) LOG.warning(e.toString()); } } - - private class ModemInputStream extends InputStream { - - private final BlockingQueue<byte[]> received; - - private byte[] buf = null; - private int offset = 0; - - private volatile boolean closed = false; - - private ModemInputStream() { - this.received = new LinkedBlockingQueue<byte[]>(); - } - - @Override - public int read() throws IOException { - if(closed) throw new IOException("Connection closed"); - getBufferIfNecessary(); - return buf[offset++]; - } - - @Override - public int read(byte[] b) throws IOException { - if(closed) throw new IOException("Connection closed"); - getBufferIfNecessary(); - int len = Math.min(b.length, buf.length - offset); - System.arraycopy(buf, offset, b, 0, len); - offset += len; - return len; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - if(closed) throw new IOException("Connection closed"); - getBufferIfNecessary(); - len = Math.min(len, buf.length - offset); - System.arraycopy(buf, offset, b, off, len); - offset += len; - return len; - } - - private void getBufferIfNecessary() throws IOException { - if(buf == null || offset == buf.length) { - try { - buf = received.take(); - } catch(InterruptedException e) { - if(LOG.isLoggable(WARNING)) - LOG.warning("Interrupted while reading"); - tryToClose(port); - Thread.currentThread().interrupt(); - throw new IOException(e.toString()); - } - if(buf.length == 0) throw new IOException("Connection closed"); - offset = 0; - } - } - } - - private class ModemOutputStream extends OutputStream { - - private volatile boolean closed = false; - - @Override - public void write(int b) throws IOException { - if(closed) throw new IOException("Connection closed"); - try { - port.writeByte((byte) b); - } catch(SerialPortException e) { - tryToClose(port); - throw new IOException(e.toString()); - } - } - - @Override - public void write(byte[] b) throws IOException { - if(closed) throw new IOException("Connection closed"); - try { - port.writeBytes(b); - } catch(SerialPortException e) { - tryToClose(port); - throw new IOException(e.toString()); - } - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - if(closed) throw new IOException("Connection closed"); - if(len < b.length) { - byte[] copy = new byte[len]; - System.arraycopy(b, off, copy, 0, len); - write(copy); - } else { - write(b); - } - } - } } diff --git a/src/net/sf/briar/plugins/modem/ReadHandler.java b/src/net/sf/briar/plugins/modem/ReadHandler.java new file mode 100644 index 0000000000..46b19329dc --- /dev/null +++ b/src/net/sf/briar/plugins/modem/ReadHandler.java @@ -0,0 +1,8 @@ +package net.sf.briar.plugins.modem; + +import java.io.IOException; + +interface ReadHandler { + + void handleRead(byte[] b, int length) throws IOException; +} diff --git a/src/net/sf/briar/plugins/modem/Receiver.java b/src/net/sf/briar/plugins/modem/Receiver.java new file mode 100644 index 0000000000..e48dea8857 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/Receiver.java @@ -0,0 +1,138 @@ +package net.sf.briar.plugins.modem; + +import static java.util.logging.Level.FINE; + +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.logging.Logger; + +class Receiver implements ReadHandler { + + private static final Logger LOG = + Logger.getLogger(Receiver.class.getName()); + + private static final int MAX_WINDOW_SIZE = 8 * Data.MAX_PAYLOAD_LENGTH; + + private final Sender sender; + private final SortedSet<Data> dataFrames; // Locking: this + + private int windowSize = MAX_WINDOW_SIZE; // Locking: this + private long finalSequenceNumber = Long.MAX_VALUE; + private long nextSequenceNumber = 1L; + + Receiver(Sender sender) { + this.sender = sender; + dataFrames = new TreeSet<Data>(new SequenceNumberComparator()); + } + + synchronized Data read() throws IOException, InterruptedException { + while(true) { + if(dataFrames.isEmpty()) { + if(LOG.isLoggable(FINE)) LOG.fine("Waiting for a data frame"); + wait(); + } else { + Data d = dataFrames.first(); + if(d.getSequenceNumber() == nextSequenceNumber) { + if(LOG.isLoggable(FINE)) + LOG.fine("Reading #" + d.getSequenceNumber()); + dataFrames.remove(d); + // Update the window + windowSize += d.getPayloadLength(); + if(LOG.isLoggable(FINE)) + LOG.fine("Window at receiver " + windowSize); + sender.sendAck(0L, windowSize); + nextSequenceNumber++; + return d; + } else { + if(LOG.isLoggable(FINE)) + LOG.fine("Waiting for #" + nextSequenceNumber); + wait(); + } + } + } + } + + public void handleRead(byte[] b, int length) throws IOException { + if(length < Data.MIN_LENGTH || length > Data.MAX_LENGTH) { + if(LOG.isLoggable(FINE)) + LOG.fine("Ignoring frame with invalid length"); + return; + } + switch(b[0]) { + case 0: + case Frame.FIN_FLAG: + handleData(b, length); + break; + case Frame.ACK_FLAG: + sender.handleAck(b, length); + break; + default: + if(LOG.isLoggable(FINE)) LOG.fine("Ignoring unknown frame type"); + return; + } + } + + private synchronized void handleData(byte[] b, int length) + throws IOException { + Data d = new Data(b, length); + int payloadLength = d.getPayloadLength(); + if(payloadLength > windowSize) { + if(LOG.isLoggable(FINE)) LOG.fine("No space in the window"); + return; + } + if(d.getChecksum() != d.calculateChecksum()) { + if(LOG.isLoggable(FINE)) + LOG.fine("Incorrect checksum on data frame"); + return; + } + long sequenceNumber = d.getSequenceNumber(); + if(sequenceNumber == 0L) { + if(LOG.isLoggable(FINE)) LOG.fine("Window probe"); + } else if(sequenceNumber < nextSequenceNumber) { + if(LOG.isLoggable(FINE)) LOG.fine("Duplicate data frame"); + } else if(d.isLastFrame()) { + finalSequenceNumber = sequenceNumber; + Iterator<Data> it = dataFrames.iterator(); + while(it.hasNext()) { + Data d1 = it.next(); + if(d1.getSequenceNumber() >= finalSequenceNumber) { + if(LOG.isLoggable(FINE)) + LOG.fine("Received data frame after FIN"); + it.remove(); + } + } + if(LOG.isLoggable(FINE)) LOG.fine("Received #" + sequenceNumber); + if(dataFrames.add(d)) { + windowSize -= payloadLength; + if(LOG.isLoggable(FINE)) + LOG.fine("Window at receiver " + windowSize); + notifyAll(); + } + } else if(sequenceNumber < finalSequenceNumber) { + if(LOG.isLoggable(FINE)) LOG.fine("Received #" + sequenceNumber); + if(dataFrames.add(d)) { + windowSize -= payloadLength; + if(LOG.isLoggable(FINE)) + LOG.fine("Window at receiver " + windowSize); + notifyAll(); + } + } else { + if(LOG.isLoggable(FINE)) LOG.fine("Received data frame after FIN"); + } + // Acknowledge the data frame even if it's a duplicate + sender.sendAck(sequenceNumber, windowSize); + } + + private static class SequenceNumberComparator implements Comparator<Data> { + + public int compare(Data d1, Data d2) { + long s1 = d1.getSequenceNumber(), s2 = d2.getSequenceNumber(); + if(s1 < s2) return -1; + if(s1 > s2) return 1; + return 0; + } + } +} diff --git a/src/net/sf/briar/plugins/modem/ReceiverInputStream.java b/src/net/sf/briar/plugins/modem/ReceiverInputStream.java new file mode 100644 index 0000000000..11e707acdf --- /dev/null +++ b/src/net/sf/briar/plugins/modem/ReceiverInputStream.java @@ -0,0 +1,59 @@ +package net.sf.briar.plugins.modem; + +import java.io.IOException; +import java.io.InputStream; + +class ReceiverInputStream extends InputStream { + + private final Receiver receiver; + + private Data data = null; + private int offset = 0, length = 0; + + ReceiverInputStream(Receiver receiver) { + this.receiver = receiver; + } + + @Override + public int read() throws IOException { + if(length == -1) return -1; + while(length == 0) if(!receive()) return -1; + int b = data.getBuffer()[offset] & 0xff; + offset++; + length--; + return b; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if(length == -1) return -1; + while(length == 0) if(!receive()) return -1; + len = Math.min(len, length); + System.arraycopy(data.getBuffer(), offset, b, off, len); + offset += len; + length -= len; + return len; + } + + private boolean receive() throws IOException { + assert length == 0; + if(data != null && data.isLastFrame()) { + length = -1; + return false; + } + try { + data = receiver.read(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while reading"); + } + offset = Data.HEADER_LENGTH; + length = data.getLength() - Data.MIN_LENGTH; + return true; + } +} diff --git a/src/net/sf/briar/plugins/modem/ReliabilityLayer.java b/src/net/sf/briar/plugins/modem/ReliabilityLayer.java new file mode 100644 index 0000000000..3c965b37e8 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/ReliabilityLayer.java @@ -0,0 +1,52 @@ +package net.sf.briar.plugins.modem; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +class ReliabilityLayer implements ReadHandler, WriteHandler { + + // Write side + private final WriteHandler writeHandler; + private final SlipEncoder encoder; + private final Sender sender; + private final SenderOutputStream outputStream; + // Read side + private final SlipDecoder decoder; + private final Receiver receiver; + private final ReceiverInputStream inputStream; + + private volatile boolean valid = true; + + ReliabilityLayer(WriteHandler writeHandler) { + this.writeHandler = writeHandler; + encoder = new SlipEncoder(this); + sender = new Sender(encoder); + outputStream = new SenderOutputStream(sender); + receiver = new Receiver(sender); + decoder = new SlipDecoder(receiver); + inputStream = new ReceiverInputStream(receiver); + } + + InputStream getInputStream() { + return inputStream; + } + + OutputStream getOutputStream() { + return outputStream; + } + + void invalidate() { + valid = false; + } + + public void handleRead(byte[] b, int length) throws IOException { + if(!valid) throw new IOException("Connection closed"); + decoder.handleRead(b, length); + } + + public void handleWrite(byte[] b, int length) throws IOException { + if(!valid) throw new IOException("Connection closed"); + writeHandler.handleWrite(b, length); + } +} diff --git a/src/net/sf/briar/plugins/modem/Sender.java b/src/net/sf/briar/plugins/modem/Sender.java new file mode 100644 index 0000000000..8fabb35ebf --- /dev/null +++ b/src/net/sf/briar/plugins/modem/Sender.java @@ -0,0 +1,221 @@ +package net.sf.briar.plugins.modem; + +import static java.util.logging.Level.FINE; +import static java.util.logging.Level.WARNING; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.logging.Logger; + +class Sender { + + private static final Logger LOG = + Logger.getLogger(Sender.class.getName()); + + // All times are in milliseconds + private static final int MIN_TIMEOUT = 1000; + private static final int MAX_TIMEOUT = 60 * 1000; + private static final int INITIAL_RTT = 0; + private static final int INITIAL_RTT_VAR = 3 * 1000; + + private final WriteHandler writeHandler; + private final LinkedList<Outstanding> outstanding; // Locking: this + + private int outstandingBytes = 0; // Locking: this + private int windowSize = Data.MAX_PAYLOAD_LENGTH; // Locking: this + private int rtt = INITIAL_RTT, rttVar = INITIAL_RTT_VAR; // Locking: this + private int timeout = rtt + (rttVar << 2); // Locking: this + private long lastWindowUpdateOrProbe = Long.MAX_VALUE; // Locking: this + private boolean dataWaiting = false; // Locking: this + + Sender(WriteHandler writeHandler) { + this.writeHandler = writeHandler; + outstanding = new LinkedList<Outstanding>(); + } + + void sendAck(long sequenceNumber, int windowSize) throws IOException { + Ack a = new Ack(); + a.setSequenceNumber(sequenceNumber); + a.setWindowSize(windowSize); + a.setChecksum(a.calculateChecksum()); + if(sequenceNumber == 0L) { + if(LOG.isLoggable(FINE)) LOG.fine("Sending window update"); + } else { + if(LOG.isLoggable(FINE)) + LOG.fine("Acknowledging #" + sequenceNumber); + } + writeHandler.handleWrite(a.getBuffer(), a.getLength()); + } + + void handleAck(byte[] b, int length) { + if(length != Ack.LENGTH) { + if(LOG.isLoggable(FINE)) + LOG.fine("Ignoring ack frame with wrong length"); + return; + } + Ack a = new Ack(b); + if(a.getChecksum() != a.calculateChecksum()) { + if(LOG.isLoggable(FINE)) + LOG.fine("Incorrect checksum on ack frame"); + return; + } + long sequenceNumber = a.getSequenceNumber(); + long now = System.currentTimeMillis(); + Outstanding fastRetransmit = null; + synchronized(this) { + // Remove the acked data frame if it's outstanding + int foundIndex = -1; + Iterator<Outstanding> it = outstanding.iterator(); + for(int i = 0; it.hasNext(); i++) { + Outstanding o = it.next(); + if(o.data.getSequenceNumber() == sequenceNumber) { + if(LOG.isLoggable(FINE)) + LOG.fine("#" + sequenceNumber + " acknowledged"); + it.remove(); + outstandingBytes -= o.data.getPayloadLength(); + foundIndex = i; + // Update the round-trip time and retransmission timer + if(!o.retransmitted) { + int sample = (int) (now - o.lastTransmitted); + int error = sample - rtt; + rtt += (error >> 3); + rttVar += (Math.abs(error) - rttVar) >> 2; + timeout = rtt + (rttVar << 2); + if(timeout < MIN_TIMEOUT) timeout = MIN_TIMEOUT; + else if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; + if(LOG.isLoggable(FINE)) + LOG.fine("RTT " + rtt + ", timeout " + timeout); + } + break; + } + } + // If any older data frames are outstanding, retransmit the oldest + if(foundIndex > 0) { + fastRetransmit = outstanding.poll(); + if(LOG.isLoggable(FINE)) { + LOG.fine("Fast retransmitting #" + + fastRetransmit.data.getSequenceNumber()); + } + fastRetransmit.lastTransmitted = now; + fastRetransmit.retransmitted = true; + outstanding.add(fastRetransmit); + } + // Update the window + lastWindowUpdateOrProbe = now; + int oldWindowSize = windowSize; + windowSize = a.getWindowSize(); + if(LOG.isLoggable(FINE)) LOG.fine("Window at sender " + windowSize); + // If space has become available, notify any waiting writers + if(windowSize > oldWindowSize || foundIndex != -1) notifyAll(); + } + // Fast retransmission + if(fastRetransmit != null) { + Data d = fastRetransmit.data; + try { + writeHandler.handleWrite(d.getBuffer(), d.getLength()); + } catch(IOException e) { + // FIXME: Do something more meaningful + if(LOG.isLoggable(WARNING)) LOG.warning(e.toString()); + } + } + } + + void tick() { + long now = System.currentTimeMillis(); + List<Outstanding> retransmit = null; + boolean sendProbe = false; + synchronized(this) { + if(outstanding.isEmpty()) { + if(dataWaiting && now - lastWindowUpdateOrProbe > timeout) { + if(LOG.isLoggable(FINE)) LOG.fine("Sending window probe"); + sendProbe = true; + timeout <<= 1; + if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; + if(LOG.isLoggable(FINE)) + LOG.fine("Increasing timeout to " + timeout); + } + } else { + Iterator<Outstanding> it = outstanding.iterator(); + while(it.hasNext()) { + Outstanding o = it.next(); + if(now - o.lastTransmitted > timeout) { + if(LOG.isLoggable(FINE)) { + LOG.fine("Retransmitting #" + + o.data.getSequenceNumber()); + } + it.remove(); + if(retransmit == null) + retransmit = new ArrayList<Outstanding>(); + retransmit.add(o); + timeout <<= 1; + if(timeout > MAX_TIMEOUT) timeout = MAX_TIMEOUT; + if(LOG.isLoggable(FINE)) + LOG.fine("Increasing timeout to " + timeout); + } + } + if(retransmit != null) { + for(Outstanding o : retransmit) { + o.lastTransmitted = now; + o.retransmitted = true; + outstanding.add(o); + } + } + } + } + try { + // Send a window probe if necessary + if(sendProbe) { + byte[] buf = new byte[Data.MIN_LENGTH]; + Data probe = new Data(buf, Data.MIN_LENGTH); + probe.setChecksum(probe.calculateChecksum()); + writeHandler.handleWrite(buf, Data.MIN_LENGTH); + } + // Retransmit any lost data frames + if(retransmit != null) { + for(Outstanding o : retransmit) { + Data d = o.data; + writeHandler.handleWrite(d.getBuffer(), d.getLength()); + } + } + } catch(IOException e) { + // FIXME: Do something more meaningful + if(LOG.isLoggable(WARNING)) LOG.warning(e.toString()); + return; + } + } + + void write(Data d) throws IOException, InterruptedException { + int payloadLength = d.getPayloadLength(); + synchronized(this) { + while(outstandingBytes + payloadLength >= windowSize) { + if(LOG.isLoggable(FINE)) + LOG.fine("Waiting for space in the window"); + dataWaiting = true; + wait(); + } + outstanding.add(new Outstanding(d)); + outstandingBytes += payloadLength; + dataWaiting = false; + } + if(LOG.isLoggable(FINE)) + LOG.fine("Transmitting #" + d.getSequenceNumber()); + writeHandler.handleWrite(d.getBuffer(), d.getLength()); + } + + private static class Outstanding { + + private final Data data; + + private volatile long lastTransmitted; + private volatile boolean retransmitted; + + private Outstanding(Data data) { + this.data = data; + lastTransmitted = System.currentTimeMillis(); + retransmitted = false; + } + } +} diff --git a/src/net/sf/briar/plugins/modem/SenderOutputStream.java b/src/net/sf/briar/plugins/modem/SenderOutputStream.java new file mode 100644 index 0000000000..ca3a5f7b23 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/SenderOutputStream.java @@ -0,0 +1,78 @@ +package net.sf.briar.plugins.modem; + +import java.io.IOException; +import java.io.OutputStream; + +class SenderOutputStream extends OutputStream { + + private final Sender sender; + + private byte[] buf = null; + private int offset = 0; + private long sequenceNumber = 1L; + + SenderOutputStream(Sender sender) { + this.sender = sender; + } + + @Override + public void close() throws IOException { + if(buf == null) assignBuffer(); + send(true); + } + + @Override + public void flush() throws IOException { + if(buf != null) send(false); + } + + @Override + public void write(int b) throws IOException { + if(buf == null) assignBuffer(); + buf[offset] = (byte) b; + offset++; + if(offset == Data.HEADER_LENGTH + Data.MAX_PAYLOAD_LENGTH) send(false); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if(buf == null) assignBuffer(); + int available = Data.MAX_LENGTH - offset - Data.FOOTER_LENGTH; + while(available <= len) { + System.arraycopy(b, off, buf, offset, available); + offset += available; + send(false); + assignBuffer(); + off += available; + len -= available; + available = Data.MAX_LENGTH - offset - Data.FOOTER_LENGTH; + } + System.arraycopy(b, off, buf, offset, len); + offset += len; + } + + private void assignBuffer() { + buf = new byte[Data.MAX_LENGTH]; + offset = Data.HEADER_LENGTH; + } + + private void send(boolean lastFrame) throws IOException { + Data d = new Data(buf, offset + Data.FOOTER_LENGTH); + d.setLastFrame(lastFrame); + d.setSequenceNumber(sequenceNumber++); + d.setChecksum(d.calculateChecksum()); + try { + sender.write(d); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while writing"); + } + buf = null; + offset = 0; + } +} diff --git a/src/net/sf/briar/plugins/modem/SlipDecoder.java b/src/net/sf/briar/plugins/modem/SlipDecoder.java new file mode 100644 index 0000000000..883010ed65 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/SlipDecoder.java @@ -0,0 +1,81 @@ +package net.sf.briar.plugins.modem; + +import static java.util.logging.Level.FINE; + +import java.io.IOException; +import java.util.logging.Logger; + +class SlipDecoder implements ReadHandler { + + private static final Logger LOG = + Logger.getLogger(SlipDecoder.class.getName()); + + // https://tools.ietf.org/html/rfc1055 + private static final byte END = (byte) 192, ESC = (byte) 219; + private static final byte TEND = (byte) 220, TESC = (byte) 221; + + private final ReadHandler readHandler; + + private byte[] buf = new byte[Data.MAX_LENGTH]; + private int decodedLength = 0; + private boolean escape = false; + + SlipDecoder(ReadHandler readHandler) { + this.readHandler = readHandler; + } + + public void handleRead(byte[] b, int length) throws IOException { + for(int i = 0; i < length; i++) { + switch(b[i]) { + case END: + if(escape) { + reset(true); + } else { + if(decodedLength > 0) { + readHandler.handleRead(buf, decodedLength); + buf = new byte[Data.MAX_LENGTH]; + } + reset(false); + } + break; + case ESC: + if(escape) reset(true); + else escape = true; + break; + case TEND: + if(escape) { + escape = false; + if(decodedLength == buf.length) reset(true); + else buf[decodedLength++] = END; + } else { + if(decodedLength == buf.length) reset(true); + else buf[decodedLength++] = TEND; + } + break; + case TESC: + if(escape) { + escape = false; + if(decodedLength == buf.length) reset(true); + else buf[decodedLength++] = ESC; + } else { + if(decodedLength == buf.length) reset(true); + else buf[decodedLength++] = TESC; + } + break; + default: + if(escape || decodedLength == buf.length) reset(true); + else buf[decodedLength++] = b[i]; + break; + } + } + } + + private void reset(boolean error) { + if(error) { + if(LOG.isLoggable(FINE)) + LOG.fine("Decoding error after " + decodedLength + " bytes"); + } + escape = false; + decodedLength = 0; + } +} diff --git a/src/net/sf/briar/plugins/modem/SlipEncoder.java b/src/net/sf/briar/plugins/modem/SlipEncoder.java new file mode 100644 index 0000000000..577ce7dd21 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/SlipEncoder.java @@ -0,0 +1,39 @@ +package net.sf.briar.plugins.modem; + +import java.io.IOException; + +class SlipEncoder implements WriteHandler { + + // https://tools.ietf.org/html/rfc1055 + private static final byte END = (byte) 192, ESC = (byte) 219; + private static final byte TEND = (byte) 220, TESC = (byte) 221; + + private final WriteHandler writeHandler; + + SlipEncoder(WriteHandler writeHandler) { + this.writeHandler = writeHandler; + } + + public void handleWrite(byte[] b, int length) throws IOException { + if(length > Data.MAX_LENGTH) throw new IllegalArgumentException(); + int encodedLength = length + 2; + for(int i = 0; i < length; i++) { + if(b[i] == END || b[i] == ESC) encodedLength++; + } + byte[] buf = new byte[encodedLength]; + buf[0] = END; + for(int i = 0, j = 1; i < length; i++) { + if(b[i] == END) { + buf[j++] = ESC; + buf[j++] = TEND; + } else if(b[i] == ESC) { + buf[j++] = ESC; + buf[j++] = TESC; + } else { + buf[j++] = b[i]; + } + } + buf[encodedLength - 1] = END; + writeHandler.handleWrite(buf, encodedLength); + } +} diff --git a/src/net/sf/briar/plugins/modem/WriteHandler.java b/src/net/sf/briar/plugins/modem/WriteHandler.java new file mode 100644 index 0000000000..a0637da755 --- /dev/null +++ b/src/net/sf/briar/plugins/modem/WriteHandler.java @@ -0,0 +1,8 @@ +package net.sf.briar.plugins.modem; + +import java.io.IOException; + +interface WriteHandler { + + void handleWrite(byte[] b, int length) throws IOException; +} diff --git a/src/net/sf/briar/util/ByteUtils.java b/src/net/sf/briar/util/ByteUtils.java index d858b8f9b0..ec062ec817 100644 --- a/src/net/sf/briar/util/ByteUtils.java +++ b/src/net/sf/briar/util/ByteUtils.java @@ -7,6 +7,11 @@ public class ByteUtils { */ public static final int MAX_16_BIT_UNSIGNED = 65535; // 2^16 - 1 + /** + * The maximum value that can be represented as an unsigned 24-bit integer. + */ + public static final int MAX_24_BIT_UNSIGNED = 16777215; // 2^24 - 1 + /** * The maximum value that can be represented as an unsigned 32-bit integer. */ @@ -27,6 +32,15 @@ public class ByteUtils { b[offset + 1] = (byte) (i & 0xFF); } + public static void writeUint24(long i, byte[] b, int offset) { + if(i < 0L) throw new IllegalArgumentException(); + if(i > MAX_24_BIT_UNSIGNED) throw new IllegalArgumentException(); + if(b.length < offset + 3) throw new IllegalArgumentException(); + b[offset] = (byte) (i >> 16); + b[offset + 1] = (byte) (i >> 8 & 0xFF); + b[offset + 2] = (byte) (i & 0xFF); + } + public static void writeUint32(long i, byte[] b, int offset) { if(i < 0L) throw new IllegalArgumentException(); if(i > MAX_32_BIT_UNSIGNED) throw new IllegalArgumentException(); @@ -42,6 +56,12 @@ public class ByteUtils { return ((b[offset] & 0xFF) << 8) | (b[offset + 1] & 0xFF); } + public static int readUint24(byte[] b, int offset) { + if(b.length < offset + 3) throw new IllegalArgumentException(); + return ((b[offset] & 0xFF) << 16) | ((b[offset + 1] & 0xFF) << 8) + | (b[offset + 2] & 0xFF); + } + public static long readUint32(byte[] b, int offset) { if(b.length < offset + 4) throw new IllegalArgumentException(); return ((b[offset] & 0xFFL) << 24) | ((b[offset + 1] & 0xFFL) << 16) -- GitLab