diff --git a/src/net/sf/briar/plugins/modem/ReliabilityLayer.java b/src/net/sf/briar/plugins/modem/ReliabilityLayer.java index 3265d197c188d5d429eeac0cc2738f309461656e..ccab001be118875ad8d90d82b42bcb1e80b1c84e 100644 --- a/src/net/sf/briar/plugins/modem/ReliabilityLayer.java +++ b/src/net/sf/briar/plugins/modem/ReliabilityLayer.java @@ -1,16 +1,25 @@ package net.sf.briar.plugins.modem; +import static java.util.logging.Level.WARNING; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Logger; class ReliabilityLayer implements ReadHandler, WriteHandler { + private static final Logger LOG = + Logger.getLogger(ReliabilityLayer.class.getName()); + private final WriteHandler writeHandler; private final Receiver receiver; private final SlipDecoder decoder; private final ReceiverInputStream inputStream; private final SenderOutputStream outputStream; + private final BlockingQueue<byte[]> writes; private volatile boolean valid = true; @@ -22,6 +31,31 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { decoder = new SlipDecoder(receiver); inputStream = new ReceiverInputStream(receiver); outputStream = new SenderOutputStream(sender); + writes = new LinkedBlockingQueue<byte[]>(); + } + + void init() { + new Thread("ReliabilityLayer") { + @Override + public void run() { + try { + while(valid) { + byte[] b = writes.take(); + if(b.length == 0) return; // Poison pill + writeHandler.handleWrite(b, b.length); + } + } catch(InterruptedException e) { + if(LOG.isLoggable(WARNING)) + LOG.warning("Interrupted while writing"); + valid = false; + Thread.currentThread().interrupt(); + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) + LOG.warning("Interrupted while writing"); + valid = false; + } + } + }.start(); } InputStream getInputStream() { @@ -35,6 +69,7 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { void invalidate() { valid = false; receiver.invalidate(); + writes.add(new byte[0]); // Poison pill } // The modem calls this method to pass data up to the SLIP decoder @@ -46,6 +81,12 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { // The SLIP encoder calls this method to pass data down to the modem public void handleWrite(byte[] b, int length) throws IOException { if(!valid) throw new IOException("Connection closed"); - writeHandler.handleWrite(b, length); + if(length == 0) return; + if(length < b.length) { + byte[] copy = new byte[length]; + System.arraycopy(b, 0, copy, 0, length); + b = copy; + } + writes.add(b); } }