Skip to content
Snippets Groups Projects
Commit 47749c3c authored by akwizgran's avatar akwizgran
Browse files

Converted ReliabilityLayer into an interface for better testability.

parent e5d15d42
No related branches found
No related tags found
No related merge requests found
Showing
with 175 additions and 214 deletions
...@@ -5,12 +5,15 @@ import java.util.concurrent.Executor; ...@@ -5,12 +5,15 @@ import java.util.concurrent.Executor;
class ModemFactoryImpl implements ModemFactory { class ModemFactoryImpl implements ModemFactory {
private final Executor executor; private final Executor executor;
private final ReliabilityLayerFactory reliabilityFactory;
ModemFactoryImpl(Executor executor) { ModemFactoryImpl(Executor executor,
ReliabilityLayerFactory reliabilityFactory) {
this.executor = executor; this.executor = executor;
this.reliabilityFactory = reliabilityFactory;
} }
public Modem createModem(Modem.Callback callback, String portName) { public Modem createModem(Modem.Callback callback, String portName) {
return new ModemImpl(executor, callback, portName); return new ModemImpl(executor, reliabilityFactory, callback, portName);
} }
} }
...@@ -29,6 +29,7 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -29,6 +29,7 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
private static final int CONNECT_TIMEOUT = 2 * 60 * 1000; // Milliseconds private static final int CONNECT_TIMEOUT = 2 * 60 * 1000; // Milliseconds
private final Executor executor; private final Executor executor;
private final ReliabilityLayerFactory reliabilityFactory;
private final Callback callback; private final Callback callback;
private final SerialPort port; private final SerialPort port;
private final Semaphore stateChange; private final Semaphore stateChange;
...@@ -36,11 +37,13 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -36,11 +37,13 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
private int lineLen = 0; private int lineLen = 0;
private ReliabilityLayer reliabilityLayer = null; // Locking: this private ReliabilityLayer reliability = null; // Locking: this
private boolean initialised = false, connected = false; // Locking: this private boolean initialised = false, connected = false; // Locking: this
ModemImpl(Executor executor, Callback callback, String portName) { ModemImpl(Executor executor, ReliabilityLayerFactory reliabilityFactory,
Callback callback, String portName) {
this.executor = executor; this.executor = executor;
this.reliabilityFactory = reliabilityFactory;
this.callback = callback; this.callback = callback;
port = new SerialPort(portName); port = new SerialPort(portName);
stateChange = new Semaphore(1); stateChange = new Semaphore(1);
...@@ -142,18 +145,18 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -142,18 +145,18 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
// Locking: stateChange // Locking: stateChange
private void hangUpInner() throws IOException { private void hangUpInner() throws IOException {
ReliabilityLayer reliabilityLayer; ReliabilityLayer reliability;
synchronized(this) { synchronized(this) {
if(this.reliabilityLayer == null) { if(this.reliability == null) {
if(LOG.isLoggable(INFO)) if(LOG.isLoggable(INFO))
LOG.info("Not hanging up - already on the hook"); LOG.info("Not hanging up - already on the hook");
return; return;
} }
reliabilityLayer = this.reliabilityLayer; reliability = this.reliability;
this.reliabilityLayer = null; this.reliability = null;
connected = false; connected = false;
} }
reliabilityLayer.stop(); reliability.stop();
if(LOG.isLoggable(INFO)) LOG.info("Hanging up"); if(LOG.isLoggable(INFO)) LOG.info("Hanging up");
try { try {
port.setDTR(false); port.setDTR(false);
...@@ -170,22 +173,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -170,22 +173,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
return false; return false;
} }
try { try {
ReliabilityLayer reliabilityLayer = ReliabilityLayer reliability =
new ReliabilityLayer(executor, this); reliabilityFactory.createReliabilityLayer(this);
synchronized(this) { synchronized(this) {
if(!initialised) { if(!initialised) {
if(LOG.isLoggable(INFO)) if(LOG.isLoggable(INFO))
LOG.info("Not dialling - modem not initialised"); LOG.info("Not dialling - modem not initialised");
return false; return false;
} }
if(this.reliabilityLayer != null) { if(this.reliability != null) {
if(LOG.isLoggable(INFO)) if(LOG.isLoggable(INFO))
LOG.info("Not dialling - call in progress"); LOG.info("Not dialling - call in progress");
return false; return false;
} }
this.reliabilityLayer = reliabilityLayer; this.reliability = reliability;
} }
reliabilityLayer.start(); reliability.start();
if(LOG.isLoggable(INFO)) LOG.info("Dialling"); if(LOG.isLoggable(INFO)) LOG.info("Dialling");
try { try {
String dial = "ATDT" + number + "\r\n"; String dial = "ATDT" + number + "\r\n";
...@@ -218,21 +221,21 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -218,21 +221,21 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
} }
public InputStream getInputStream() throws IOException { public InputStream getInputStream() throws IOException {
ReliabilityLayer reliabilityLayer; ReliabilityLayer reliability;
synchronized(this) { synchronized(this) {
reliabilityLayer = this.reliabilityLayer; reliability = this.reliability;
} }
if(reliabilityLayer == null) throw new IOException("Not connected"); if(reliability == null) throw new IOException("Not connected");
return reliabilityLayer.getInputStream(); return reliability.getInputStream();
} }
public OutputStream getOutputStream() throws IOException { public OutputStream getOutputStream() throws IOException {
ReliabilityLayer reliabilityLayer; ReliabilityLayer reliability;
synchronized(this) { synchronized(this) {
reliabilityLayer = this.reliabilityLayer; reliability = this.reliability;
} }
if(reliabilityLayer == null) throw new IOException("Not connected"); if(reliability == null) throw new IOException("Not connected");
return reliabilityLayer.getOutputStream(); return reliability.getOutputStream();
} }
public void hangUp() throws IOException { public void hangUp() throws IOException {
...@@ -280,12 +283,12 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -280,12 +283,12 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
} }
private boolean handleData(byte[] b) throws IOException { private boolean handleData(byte[] b) throws IOException {
ReliabilityLayer reliabilityLayer; ReliabilityLayer reliability;
synchronized(this) { synchronized(this) {
reliabilityLayer = this.reliabilityLayer; reliability = this.reliability;
} }
if(reliabilityLayer == null) return false; if(reliability == null) return false;
reliabilityLayer.handleRead(b); reliability.handleRead(b);
return true; return true;
} }
...@@ -349,22 +352,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { ...@@ -349,22 +352,22 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener {
return; return;
} }
try { try {
ReliabilityLayer reliabilityLayer = ReliabilityLayer reliability =
new ReliabilityLayer(executor, this); reliabilityFactory.createReliabilityLayer(this);
synchronized(this) { synchronized(this) {
if(!initialised) { if(!initialised) {
if(LOG.isLoggable(INFO)) if(LOG.isLoggable(INFO))
LOG.info("Not answering - modem not initialised"); LOG.info("Not answering - modem not initialised");
return; return;
} }
if(this.reliabilityLayer != null) { if(this.reliability != null) {
if(LOG.isLoggable(INFO)) if(LOG.isLoggable(INFO))
LOG.info("Not answering - call in progress"); LOG.info("Not answering - call in progress");
return; return;
} }
this.reliabilityLayer = reliabilityLayer; this.reliability = reliability;
} }
reliabilityLayer.start(); reliability.start();
if(LOG.isLoggable(INFO)) LOG.info("Answering"); if(LOG.isLoggable(INFO)) LOG.info("Answering");
try { try {
port.writeBytes("ATA\r\n".getBytes("US-ASCII")); port.writeBytes("ATA\r\n".getBytes("US-ASCII"));
......
...@@ -28,7 +28,10 @@ public class ModemPluginFactory implements DuplexPluginFactory { ...@@ -28,7 +28,10 @@ public class ModemPluginFactory implements DuplexPluginFactory {
// This plugin is not enabled by default // This plugin is not enabled by default
String enabled = callback.getConfig().get("enabled"); String enabled = callback.getConfig().get("enabled");
if(StringUtils.isNullOrEmpty(enabled)) return null; if(StringUtils.isNullOrEmpty(enabled)) return null;
ModemFactory modemFactory = new ModemFactoryImpl(pluginExecutor); ReliabilityLayerFactory reliabilityFactory =
new ReliabilityLayerFactoryImpl(pluginExecutor);
ModemFactory modemFactory = new ModemFactoryImpl(pluginExecutor,
reliabilityFactory);
return new ModemPlugin(pluginExecutor, modemFactory, callback, return new ModemPlugin(pluginExecutor, modemFactory, callback,
POLLING_INTERVAL); POLLING_INTERVAL);
} }
......
package net.sf.briar.plugins.modem; package net.sf.briar.plugins.modem;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
class ReliabilityLayer implements ReadHandler, WriteHandler {
private static final int TICK_INTERVAL = 500; // Milliseconds
private static final Logger LOG =
Logger.getLogger(ReliabilityLayer.class.getName());
private final Executor executor;
private final WriteHandler writeHandler;
private final BlockingQueue<byte[]> writes;
private volatile Receiver receiver = null;
private volatile SlipDecoder decoder = null;
private volatile ReceiverInputStream inputStream = null;
private volatile SenderOutputStream outputStream = null;
private volatile boolean running = false;
ReliabilityLayer(Executor executor, WriteHandler writeHandler) {
this.executor = executor;
this.writeHandler = writeHandler;
writes = new LinkedBlockingQueue<byte[]>();
}
void start() {
SlipEncoder encoder = new SlipEncoder(this);
final Sender sender = new Sender(encoder);
receiver = new Receiver(sender);
decoder = new SlipDecoder(receiver);
inputStream = new ReceiverInputStream(receiver);
outputStream = new SenderOutputStream(sender);
running = true;
executor.execute(new Runnable() {
public void run() {
long now = System.currentTimeMillis();
long next = now + TICK_INTERVAL;
try {
while(running) {
byte[] b = null;
while(now < next && b == null) {
b = writes.poll(next - now, MILLISECONDS);
now = System.currentTimeMillis();
}
if(b == null) {
sender.tick();
while(next <= now) next += TICK_INTERVAL;
} else {
if(b.length == 0) return; // Poison pill
writeHandler.handleWrite(b);
}
}
} catch(InterruptedException e) {
if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while waiting to write");
Thread.currentThread().interrupt();
running = false;
} catch(IOException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
running = false;
}
}
});
}
InputStream getInputStream() { interface ReliabilityLayer extends ReadHandler, WriteHandler {
return inputStream;
}
OutputStream getOutputStream() { void start();
return outputStream;
}
void stop() { void stop();
running = false;
receiver.invalidate();
writes.add(new byte[0]); // Poison pill
}
// The modem calls this method to pass data up to the SLIP decoder InputStream getInputStream();
public void handleRead(byte[] b) throws IOException {
if(!running) throw new IOException("Connection closed");
decoder.handleRead(b);
}
// The SLIP encoder calls this method to pass data down to the modem OutputStream getOutputStream();
public void handleWrite(byte[] b) throws IOException {
if(!running) throw new IOException("Connection closed");
if(b.length > 0) writes.add(b);
}
} }
package net.sf.briar.plugins.modem;
interface ReliabilityLayerFactory {
ReliabilityLayer createReliabilityLayer(WriteHandler writeHandler);
}
package net.sf.briar.plugins.modem;
import java.util.concurrent.Executor;
class ReliabilityLayerFactoryImpl implements ReliabilityLayerFactory {
private final Executor executor;
ReliabilityLayerFactoryImpl(Executor executor) {
this.executor = executor;
}
public ReliabilityLayer createReliabilityLayer(WriteHandler writeHandler) {
return new ReliabilityLayerImpl(executor, writeHandler);
}
}
package net.sf.briar.plugins.modem;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
class ReliabilityLayerImpl implements ReliabilityLayer {
private static final int TICK_INTERVAL = 500; // Milliseconds
private static final Logger LOG =
Logger.getLogger(ReliabilityLayerImpl.class.getName());
private final Executor executor;
private final WriteHandler writeHandler;
private final BlockingQueue<byte[]> writes;
private volatile Receiver receiver = null;
private volatile SlipDecoder decoder = null;
private volatile ReceiverInputStream inputStream = null;
private volatile SenderOutputStream outputStream = null;
private volatile boolean running = false;
ReliabilityLayerImpl(Executor executor, WriteHandler writeHandler) {
this.executor = executor;
this.writeHandler = writeHandler;
writes = new LinkedBlockingQueue<byte[]>();
}
public void start() {
SlipEncoder encoder = new SlipEncoder(this);
final Sender sender = new Sender(encoder);
receiver = new Receiver(sender);
decoder = new SlipDecoder(receiver);
inputStream = new ReceiverInputStream(receiver);
outputStream = new SenderOutputStream(sender);
running = true;
executor.execute(new Runnable() {
public void run() {
long now = System.currentTimeMillis();
long next = now + TICK_INTERVAL;
try {
while(running) {
byte[] b = null;
while(now < next && b == null) {
b = writes.poll(next - now, MILLISECONDS);
now = System.currentTimeMillis();
}
if(b == null) {
sender.tick();
while(next <= now) next += TICK_INTERVAL;
} else {
if(b.length == 0) return; // Poison pill
writeHandler.handleWrite(b);
}
}
} catch(InterruptedException e) {
if(LOG.isLoggable(WARNING))
LOG.warning("Interrupted while waiting to write");
Thread.currentThread().interrupt();
running = false;
} catch(IOException e) {
if(LOG.isLoggable(WARNING))
LOG.log(WARNING, e.toString(), e);
running = false;
}
}
});
}
public void stop() {
running = false;
receiver.invalidate();
writes.add(new byte[0]); // Poison pill
}
public InputStream getInputStream() {
return inputStream;
}
public OutputStream getOutputStream() {
return outputStream;
}
// The transport calls this method to pass data up to the SLIP decoder
public void handleRead(byte[] b) throws IOException {
if(!running) throw new IOException("Connection closed");
decoder.handleRead(b);
}
// The SLIP encoder calls this method to pass data down to the transport
public void handleWrite(byte[] b) throws IOException {
if(!running) throw new IOException("Connection closed");
if(b.length > 0) writes.add(b);
}
}
package net.sf.briar.plugins.modem;
import static java.util.logging.Level.INFO;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
public class HangupClientTest {
public static void main(String[] args) throws Exception {
if(args.length != 2) {
System.err.println("Please specify the server's phone number "
+ " and the serial port");
System.exit(1);
}
String number = args[0];
String portName = args[1];
Logger.getLogger("net.sf.briar").setLevel(INFO);
ExecutorService executor = Executors.newCachedThreadPool();
Modem.Callback callback = new Modem.Callback() {
public void incomingCallConnected() {
System.err.println("Unexpected incoming call");
System.exit(1);
}
};
try {
Modem modem = new ModemImpl(executor, callback, portName);
modem.start();
System.out.println("Dialling");
if(modem.dial(number)) {
System.out.println("Connected, waiting for server to hang up");
Thread.sleep(60 * 1000);
} else {
System.out.println("Did not connect");
}
modem.stop();
} finally {
executor.shutdown();
}
}
}
package net.sf.briar.plugins.modem;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.logging.Level.INFO;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
public class HangupServerTest {
public static void main(String[] args) throws Exception {
if(args.length != 1) {
System.err.println("Please specify the serial port");
System.exit(1);
}
String portName = args[0];
Logger.getLogger("net.sf.briar").setLevel(INFO);
ExecutorService executor = Executors.newCachedThreadPool();
final CountDownLatch latch = new CountDownLatch(1);
Modem.Callback callback = new Modem.Callback() {
public void incomingCallConnected() {
System.out.println("Connected");
latch.countDown();
}
};
try {
final Modem modem = new ModemImpl(executor, callback, portName);
modem.start();
System.out.println("Waiting for incoming call");
if(latch.await(60, SECONDS)) {
System.out.println("Hanging up");
modem.hangUp();
} else {
System.out.println("Did not connect");
}
modem.stop();
} finally {
executor.shutdown();
}
}
}
...@@ -27,8 +27,8 @@ public class ModemClientTest extends DuplexClientTest { ...@@ -27,8 +27,8 @@ public class ModemClientTest extends DuplexClientTest {
// Create the plugin // Create the plugin
callback = new ClientCallback(new TransportConfig(), callback = new ClientCallback(new TransportConfig(),
new TransportProperties(), remote); new TransportProperties(), remote);
plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor), plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor,
callback, 0L); new ReliabilityLayerFactoryImpl(executor)), callback, 0L);
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
......
...@@ -21,8 +21,8 @@ public class ModemServerTest extends DuplexServerTest { ...@@ -21,8 +21,8 @@ public class ModemServerTest extends DuplexServerTest {
callback = new ServerCallback(new TransportConfig(), callback = new ServerCallback(new TransportConfig(),
new TransportProperties(), Collections.singletonMap(contactId, new TransportProperties(), Collections.singletonMap(contactId,
new TransportProperties())); new TransportProperties()));
plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor), plugin = new ModemPlugin(executor, new ModemFactoryImpl(executor,
callback, 0L); new ReliabilityLayerFactoryImpl(executor)), callback, 0L);
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment