diff --git a/briar-core/src/net/sf/briar/plugins/modem/ModemFactoryImpl.java b/briar-core/src/net/sf/briar/plugins/modem/ModemFactoryImpl.java index 1abf52b11c8d04e52cf9810e2c4d8c67bf683a16..e9be7af29e0ad70768a9788f9a4a4eb6f76df6a1 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/ModemFactoryImpl.java +++ b/briar-core/src/net/sf/briar/plugins/modem/ModemFactoryImpl.java @@ -5,12 +5,15 @@ import java.util.concurrent.Executor; class ModemFactoryImpl implements ModemFactory { private final Executor executor; + private final ReliabilityLayerFactory reliabilityFactory; - ModemFactoryImpl(Executor executor) { + ModemFactoryImpl(Executor executor, + ReliabilityLayerFactory reliabilityFactory) { this.executor = executor; + this.reliabilityFactory = reliabilityFactory; } public Modem createModem(Modem.Callback callback, String portName) { - return new ModemImpl(executor, callback, portName); + return new ModemImpl(executor, reliabilityFactory, callback, portName); } } diff --git a/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java b/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java index c74dccac815faed84ebddfdc85a7fe7074303c9c..2d31dc9ca2b8b37f6a403c091dbb2cad7ea4da56 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java +++ b/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java @@ -29,6 +29,7 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { private static final int CONNECT_TIMEOUT = 2 * 60 * 1000; // Milliseconds private final Executor executor; + private final ReliabilityLayerFactory reliabilityFactory; private final Callback callback; private final SerialPort port; private final Semaphore stateChange; @@ -36,11 +37,13 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { private int lineLen = 0; - private ReliabilityLayer reliabilityLayer = null; // Locking: this + private ReliabilityLayer reliability = null; // Locking: this private boolean initialised = false, connected = false; // Locking: this - ModemImpl(Executor executor, Callback callback, String portName) { + ModemImpl(Executor executor, ReliabilityLayerFactory reliabilityFactory, + Callback callback, String portName) { this.executor = executor; + this.reliabilityFactory = reliabilityFactory; this.callback = callback; port = new SerialPort(portName); stateChange = new Semaphore(1); @@ -142,18 +145,18 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { // Locking: stateChange private void hangUpInner() throws IOException { - ReliabilityLayer reliabilityLayer; + ReliabilityLayer reliability; synchronized(this) { - if(this.reliabilityLayer == null) { + if(this.reliability == null) { if(LOG.isLoggable(INFO)) LOG.info("Not hanging up - already on the hook"); return; } - reliabilityLayer = this.reliabilityLayer; - this.reliabilityLayer = null; + reliability = this.reliability; + this.reliability = null; connected = false; } - reliabilityLayer.stop(); + reliability.stop(); if(LOG.isLoggable(INFO)) LOG.info("Hanging up"); try { port.setDTR(false); @@ -170,22 +173,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { return false; } try { - ReliabilityLayer reliabilityLayer = - new ReliabilityLayer(executor, this); + ReliabilityLayer reliability = + reliabilityFactory.createReliabilityLayer(this); synchronized(this) { if(!initialised) { if(LOG.isLoggable(INFO)) LOG.info("Not dialling - modem not initialised"); return false; } - if(this.reliabilityLayer != null) { + if(this.reliability != null) { if(LOG.isLoggable(INFO)) LOG.info("Not dialling - call in progress"); return false; } - this.reliabilityLayer = reliabilityLayer; + this.reliability = reliability; } - reliabilityLayer.start(); + reliability.start(); if(LOG.isLoggable(INFO)) LOG.info("Dialling"); try { String dial = "ATDT" + number + "\r\n"; @@ -218,21 +221,21 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { } public InputStream getInputStream() throws IOException { - ReliabilityLayer reliabilityLayer; + ReliabilityLayer reliability; synchronized(this) { - reliabilityLayer = this.reliabilityLayer; + reliability = this.reliability; } - if(reliabilityLayer == null) throw new IOException("Not connected"); - return reliabilityLayer.getInputStream(); + if(reliability == null) throw new IOException("Not connected"); + return reliability.getInputStream(); } public OutputStream getOutputStream() throws IOException { - ReliabilityLayer reliabilityLayer; + ReliabilityLayer reliability; synchronized(this) { - reliabilityLayer = this.reliabilityLayer; + reliability = this.reliability; } - if(reliabilityLayer == null) throw new IOException("Not connected"); - return reliabilityLayer.getOutputStream(); + if(reliability == null) throw new IOException("Not connected"); + return reliability.getOutputStream(); } public void hangUp() throws IOException { @@ -280,12 +283,12 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { } private boolean handleData(byte[] b) throws IOException { - ReliabilityLayer reliabilityLayer; + ReliabilityLayer reliability; synchronized(this) { - reliabilityLayer = this.reliabilityLayer; + reliability = this.reliability; } - if(reliabilityLayer == null) return false; - reliabilityLayer.handleRead(b); + if(reliability == null) return false; + reliability.handleRead(b); return true; } @@ -349,22 +352,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { return; } try { - ReliabilityLayer reliabilityLayer = - new ReliabilityLayer(executor, this); + ReliabilityLayer reliability = + reliabilityFactory.createReliabilityLayer(this); synchronized(this) { if(!initialised) { if(LOG.isLoggable(INFO)) LOG.info("Not answering - modem not initialised"); return; } - if(this.reliabilityLayer != null) { + if(this.reliability != null) { if(LOG.isLoggable(INFO)) LOG.info("Not answering - call in progress"); return; } - this.reliabilityLayer = reliabilityLayer; + this.reliability = reliability; } - reliabilityLayer.start(); + reliability.start(); if(LOG.isLoggable(INFO)) LOG.info("Answering"); try { port.writeBytes("ATA\r\n".getBytes("US-ASCII")); diff --git a/briar-core/src/net/sf/briar/plugins/modem/ModemPluginFactory.java b/briar-core/src/net/sf/briar/plugins/modem/ModemPluginFactory.java index 558edcc393b20bfd26cb19f4b8cf58032cf1c212..2b17f1d631d3c1a9502ceb3e33308e6b221d5c67 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/ModemPluginFactory.java +++ b/briar-core/src/net/sf/briar/plugins/modem/ModemPluginFactory.java @@ -28,7 +28,10 @@ public class ModemPluginFactory implements DuplexPluginFactory { // This plugin is not enabled by default String enabled = callback.getConfig().get("enabled"); if(StringUtils.isNullOrEmpty(enabled)) return null; - ModemFactory modemFactory = new ModemFactoryImpl(pluginExecutor); + ReliabilityLayerFactory reliabilityFactory = + new ReliabilityLayerFactoryImpl(pluginExecutor); + ModemFactory modemFactory = new ModemFactoryImpl(pluginExecutor, + reliabilityFactory); return new ModemPlugin(pluginExecutor, modemFactory, callback, POLLING_INTERVAL); } diff --git a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java index b86b1b123680337eb7630a689c8697e3473c7ec6..6b9160eef7263b33df49a61568c70e453351ea89 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java +++ b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java @@ -1,103 +1,15 @@ package net.sf.briar.plugins.modem; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.logging.Level.WARNING; - -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.logging.Logger; - -class ReliabilityLayer implements ReadHandler, WriteHandler { - - private static final int TICK_INTERVAL = 500; // Milliseconds - - private static final Logger LOG = - Logger.getLogger(ReliabilityLayer.class.getName()); - - private final Executor executor; - private final WriteHandler writeHandler; - private final BlockingQueue<byte[]> writes; - - private volatile Receiver receiver = null; - private volatile SlipDecoder decoder = null; - private volatile ReceiverInputStream inputStream = null; - private volatile SenderOutputStream outputStream = null; - private volatile boolean running = false; - - ReliabilityLayer(Executor executor, WriteHandler writeHandler) { - this.executor = executor; - this.writeHandler = writeHandler; - writes = new LinkedBlockingQueue<byte[]>(); - } - - void start() { - SlipEncoder encoder = new SlipEncoder(this); - final Sender sender = new Sender(encoder); - receiver = new Receiver(sender); - decoder = new SlipDecoder(receiver); - inputStream = new ReceiverInputStream(receiver); - outputStream = new SenderOutputStream(sender); - running = true; - executor.execute(new Runnable() { - public void run() { - long now = System.currentTimeMillis(); - long next = now + TICK_INTERVAL; - try { - while(running) { - byte[] b = null; - while(now < next && b == null) { - b = writes.poll(next - now, MILLISECONDS); - now = System.currentTimeMillis(); - } - if(b == null) { - sender.tick(); - while(next <= now) next += TICK_INTERVAL; - } else { - if(b.length == 0) return; // Poison pill - writeHandler.handleWrite(b); - } - } - } catch(InterruptedException e) { - if(LOG.isLoggable(WARNING)) - LOG.warning("Interrupted while waiting to write"); - Thread.currentThread().interrupt(); - running = false; - } catch(IOException e) { - if(LOG.isLoggable(WARNING)) - LOG.log(WARNING, e.toString(), e); - running = false; - } - } - }); - } - InputStream getInputStream() { - return inputStream; - } +interface ReliabilityLayer extends ReadHandler, WriteHandler { - OutputStream getOutputStream() { - return outputStream; - } + void start(); - void stop() { - running = false; - receiver.invalidate(); - writes.add(new byte[0]); // Poison pill - } + void stop(); - // The modem calls this method to pass data up to the SLIP decoder - public void handleRead(byte[] b) throws IOException { - if(!running) throw new IOException("Connection closed"); - decoder.handleRead(b); - } + InputStream getInputStream(); - // The SLIP encoder calls this method to pass data down to the modem - public void handleWrite(byte[] b) throws IOException { - if(!running) throw new IOException("Connection closed"); - if(b.length > 0) writes.add(b); - } + OutputStream getOutputStream(); } diff --git a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerFactory.java b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..0c5a46e3aefa8b040a468b10f0df5799eb98397d --- /dev/null +++ b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerFactory.java @@ -0,0 +1,6 @@ +package net.sf.briar.plugins.modem; + +interface ReliabilityLayerFactory { + + ReliabilityLayer createReliabilityLayer(WriteHandler writeHandler); +} diff --git a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerFactoryImpl.java b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerFactoryImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..2b72b72ffb9193dbabe45e8c43089bd184a77158 --- /dev/null +++ b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerFactoryImpl.java @@ -0,0 +1,16 @@ +package net.sf.briar.plugins.modem; + +import java.util.concurrent.Executor; + +class ReliabilityLayerFactoryImpl implements ReliabilityLayerFactory { + + private final Executor executor; + + ReliabilityLayerFactoryImpl(Executor executor) { + this.executor = executor; + } + + public ReliabilityLayer createReliabilityLayer(WriteHandler writeHandler) { + return new ReliabilityLayerImpl(executor, writeHandler); + } +} diff --git a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerImpl.java b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..2dc2f801d56171f22f55902b886c7f68ce83b369 --- /dev/null +++ b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayerImpl.java @@ -0,0 +1,103 @@ +package net.sf.briar.plugins.modem; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.logging.Level.WARNING; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.logging.Logger; + +class ReliabilityLayerImpl implements ReliabilityLayer { + + private static final int TICK_INTERVAL = 500; // Milliseconds + + private static final Logger LOG = + Logger.getLogger(ReliabilityLayerImpl.class.getName()); + + private final Executor executor; + private final WriteHandler writeHandler; + private final BlockingQueue<byte[]> writes; + + private volatile Receiver receiver = null; + private volatile SlipDecoder decoder = null; + private volatile ReceiverInputStream inputStream = null; + private volatile SenderOutputStream outputStream = null; + private volatile boolean running = false; + + ReliabilityLayerImpl(Executor executor, WriteHandler writeHandler) { + this.executor = executor; + this.writeHandler = writeHandler; + writes = new LinkedBlockingQueue<byte[]>(); + } + + public void start() { + SlipEncoder encoder = new SlipEncoder(this); + final Sender sender = new Sender(encoder); + receiver = new Receiver(sender); + decoder = new SlipDecoder(receiver); + inputStream = new ReceiverInputStream(receiver); + outputStream = new SenderOutputStream(sender); + running = true; + executor.execute(new Runnable() { + public void run() { + long now = System.currentTimeMillis(); + long next = now + TICK_INTERVAL; + try { + while(running) { + byte[] b = null; + while(now < next && b == null) { + b = writes.poll(next - now, MILLISECONDS); + now = System.currentTimeMillis(); + } + if(b == null) { + sender.tick(); + while(next <= now) next += TICK_INTERVAL; + } else { + if(b.length == 0) return; // Poison pill + writeHandler.handleWrite(b); + } + } + } catch(InterruptedException e) { + if(LOG.isLoggable(WARNING)) + LOG.warning("Interrupted while waiting to write"); + Thread.currentThread().interrupt(); + running = false; + } catch(IOException e) { + if(LOG.isLoggable(WARNING)) + LOG.log(WARNING, e.toString(), e); + running = false; + } + } + }); + } + + public void stop() { + running = false; + receiver.invalidate(); + writes.add(new byte[0]); // Poison pill + } + + public InputStream getInputStream() { + return inputStream; + } + + public OutputStream getOutputStream() { + return outputStream; + } + + // The transport calls this method to pass data up to the SLIP decoder + public void handleRead(byte[] b) throws IOException { + if(!running) throw new IOException("Connection closed"); + decoder.handleRead(b); + } + + // The SLIP encoder calls this method to pass data down to the transport + public void handleWrite(byte[] b) throws IOException { + if(!running) throw new IOException("Connection closed"); + if(b.length > 0) writes.add(b); + } +} diff --git a/briar-tests/src/net/sf/briar/plugins/modem/HangupClientTest.java b/briar-tests/src/net/sf/briar/plugins/modem/HangupClientTest.java deleted file mode 100644 index a591f08490e0d0d2c5fbc033b4dcbcd90b345641..0000000000000000000000000000000000000000 --- a/briar-tests/src/net/sf/briar/plugins/modem/HangupClientTest.java +++ /dev/null @@ -1,42 +0,0 @@ -package net.sf.briar.plugins.modem; - -import static java.util.logging.Level.INFO; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class HangupClientTest { - - public static void main(String[] args) throws Exception { - if(args.length != 2) { - System.err.println("Please specify the server's phone number " - + " and the serial port"); - System.exit(1); - } - String number = args[0]; - String portName = args[1]; - Logger.getLogger("net.sf.briar").setLevel(INFO); - ExecutorService executor = Executors.newCachedThreadPool(); - Modem.Callback callback = new Modem.Callback() { - public void incomingCallConnected() { - System.err.println("Unexpected incoming call"); - System.exit(1); - } - }; - try { - Modem modem = new ModemImpl(executor, callback, portName); - modem.start(); - System.out.println("Dialling"); - if(modem.dial(number)) { - System.out.println("Connected, waiting for server to hang up"); - Thread.sleep(60 * 1000); - } else { - System.out.println("Did not connect"); - } - modem.stop(); - } finally { - executor.shutdown(); - } - } -} diff --git a/briar-tests/src/net/sf/briar/plugins/modem/HangupServerTest.java b/briar-tests/src/net/sf/briar/plugins/modem/HangupServerTest.java deleted file mode 100644 index c1ed015ccc8ef93d2210a1cad2d5421a9c0b0a90..0000000000000000000000000000000000000000 --- a/briar-tests/src/net/sf/briar/plugins/modem/HangupServerTest.java +++ /dev/null @@ -1,43 +0,0 @@ -package net.sf.briar.plugins.modem; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.logging.Level.INFO; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Logger; - -public class HangupServerTest { - - public static void main(String[] args) throws Exception { - if(args.length != 1) { - System.err.println("Please specify the serial port"); - System.exit(1); - } - String portName = args[0]; - Logger.getLogger("net.sf.briar").setLevel(INFO); - ExecutorService executor = Executors.newCachedThreadPool(); - final CountDownLatch latch = new CountDownLatch(1); - Modem.Callback callback = new Modem.Callback() { - public void incomingCallConnected() { - System.out.println("Connected"); - latch.countDown(); - } - }; - try { - final Modem modem = new ModemImpl(executor, callback, portName); - modem.start(); - System.out.println("Waiting for incoming call"); - if(latch.await(60, SECONDS)) { - System.out.println("Hanging up"); - modem.hangUp(); - } else { - System.out.println("Did not connect"); - } - modem.stop(); - } finally { - executor.shutdown(); - } - } -} diff --git a/briar-tests/src/net/sf/briar/plugins/modem/ModemClientTest.java b/briar-tests/src/net/sf/briar/plugins/modem/ModemClientTest.java index f50407936f0fad9f0bb01bec94afd69a42dbd7c1..1a6deac1bb4f7aab579821177575a855a0a9c0a0 100644 --- a/briar-tests/src/net/sf/briar/plugins/modem/ModemClientTest.java +++ b/briar-tests/src/net/sf/briar/plugins/modem/ModemClientTest.java @@ -27,8 +27,8 @@ public class ModemClientTest extends DuplexClientTest { // Create the plugin callback = new ClientCallback(new TransportConfig(), new TransportProperties(), remote); - plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor), - callback, 0L); + plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor, + new ReliabilityLayerFactoryImpl(executor)), callback, 0L); } public static void main(String[] args) throws Exception { diff --git a/briar-tests/src/net/sf/briar/plugins/modem/ModemServerTest.java b/briar-tests/src/net/sf/briar/plugins/modem/ModemServerTest.java index 437e9deea5caac36085e459c2d0355c1ec7e40c3..3b2850d707738750515a5e3cd9d0fb01af191d8f 100644 --- a/briar-tests/src/net/sf/briar/plugins/modem/ModemServerTest.java +++ b/briar-tests/src/net/sf/briar/plugins/modem/ModemServerTest.java @@ -21,8 +21,8 @@ public class ModemServerTest extends DuplexServerTest { callback = new ServerCallback(new TransportConfig(), new TransportProperties(), Collections.singletonMap(contactId, new TransportProperties())); - plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor), - callback, 0L); + plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor, + new ReliabilityLayerFactoryImpl(executor)), callback, 0L); } public static void main(String[] args) throws Exception {