From 0b7ecde4c8c0dfa958ab702ba4d20efc8d70bf9f Mon Sep 17 00:00:00 2001 From: akwizgran <michael@briarproject.org> Date: Thu, 6 Dec 2012 16:36:51 +0000 Subject: [PATCH] Wait for writes to complete before closing the serial port. --- .../net/sf/briar/plugins/modem/ModemImpl.java | 17 ++++++++++++++++- .../briar/plugins/modem/ReliabilityLayer.java | 11 +++++++++-- .../src/net/sf/briar/plugins/modem/Sender.java | 4 ++++ .../briar/plugins/modem/SenderOutputStream.java | 6 ++++++ .../net/sf/briar/plugins/modem/SlipEncoder.java | 4 ++++ .../sf/briar/plugins/modem/WriteHandler.java | 2 ++ 6 files changed, 41 insertions(+), 3 deletions(-) diff --git a/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java b/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java index f389b7dfa2..bde47d9eaf 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java +++ b/briar-core/src/net/sf/briar/plugins/modem/ModemImpl.java @@ -33,7 +33,7 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { private final Callback callback; private final SerialPort port; private final AtomicBoolean initialised, connected; - private final Semaphore offHook; + private final Semaphore offHook, writing; private final byte[] line; private int lineLen = 0; @@ -46,6 +46,7 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { port = new SerialPort(portName); initialised = new AtomicBoolean(false); offHook = new Semaphore(1); + writing = new Semaphore(1); connected = new AtomicBoolean(false); line = new byte[MAX_LINE_LENGTH]; reliabilityLayer = new ReliabilityLayer(this); @@ -151,14 +152,28 @@ class ModemImpl implements Modem, WriteHandler, SerialPortEventListener { } public void handleWrite(byte[] b) throws IOException { + try { + writing.acquire(); + } catch(InterruptedException e) { + tryToClose(port); + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while waiting to write"); + } try { port.writeBytes(b); } catch(SerialPortException e) { tryToClose(port); throw new IOException(e.toString()); + } finally { + writing.release(); } } + public void waitForWritesToComplete() throws InterruptedException { + writing.acquire(); + writing.release(); + } + public void serialEvent(SerialPortEvent ev) { try { if(ev.isRXCHAR()) { diff --git a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java index a53bdb89bb..197709ec9d 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java +++ b/briar-core/src/net/sf/briar/plugins/modem/ReliabilityLayer.java @@ -23,6 +23,7 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { private final BlockingQueue<byte[]> writes; private volatile boolean valid = true; + private volatile Thread writer = null; ReliabilityLayer(WriteHandler writeHandler) { this.writeHandler = writeHandler; @@ -36,7 +37,7 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { } void init() { - new Thread("ReliabilityLayer") { + writer = new Thread("ReliabilityLayer") { @Override public void run() { try { @@ -58,7 +59,8 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { valid = false; } } - }.start(); + }; + writer.start(); } InputStream getInputStream() { @@ -88,4 +90,9 @@ class ReliabilityLayer implements ReadHandler, WriteHandler { if(LOG.isLoggable(INFO)) LOG.info("Queueing " + b.length + " bytes"); if(b.length > 0) writes.add(b); } + + public void waitForWritesToComplete() throws InterruptedException { + if(writer != null) writer.join(); + writeHandler.waitForWritesToComplete(); + } } diff --git a/briar-core/src/net/sf/briar/plugins/modem/Sender.java b/briar-core/src/net/sf/briar/plugins/modem/Sender.java index 9c8ef82af4..3217482ad6 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/Sender.java +++ b/briar-core/src/net/sf/briar/plugins/modem/Sender.java @@ -204,6 +204,10 @@ class Sender { writeHandler.handleWrite(d.getBuffer()); } + void waitForWritesToComplete() throws InterruptedException { + writeHandler.waitForWritesToComplete(); + } + private static class Outstanding { private final Data data; diff --git a/briar-core/src/net/sf/briar/plugins/modem/SenderOutputStream.java b/briar-core/src/net/sf/briar/plugins/modem/SenderOutputStream.java index 41bade80fe..531fa0417c 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/SenderOutputStream.java +++ b/briar-core/src/net/sf/briar/plugins/modem/SenderOutputStream.java @@ -23,6 +23,12 @@ class SenderOutputStream extends OutputStream { @Override public void flush() throws IOException { if(offset > Data.HEADER_LENGTH) send(false); + try { + sender.waitForWritesToComplete(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while flushing output stream"); + } } @Override diff --git a/briar-core/src/net/sf/briar/plugins/modem/SlipEncoder.java b/briar-core/src/net/sf/briar/plugins/modem/SlipEncoder.java index 0fc327a43f..e106d66dec 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/SlipEncoder.java +++ b/briar-core/src/net/sf/briar/plugins/modem/SlipEncoder.java @@ -35,4 +35,8 @@ class SlipEncoder implements WriteHandler { encoded[encodedLength - 1] = END; writeHandler.handleWrite(encoded); } + + public void waitForWritesToComplete() throws InterruptedException { + writeHandler.waitForWritesToComplete(); + } } diff --git a/briar-core/src/net/sf/briar/plugins/modem/WriteHandler.java b/briar-core/src/net/sf/briar/plugins/modem/WriteHandler.java index fdf4d2c153..f4780ffac1 100644 --- a/briar-core/src/net/sf/briar/plugins/modem/WriteHandler.java +++ b/briar-core/src/net/sf/briar/plugins/modem/WriteHandler.java @@ -5,4 +5,6 @@ import java.io.IOException; interface WriteHandler { void handleWrite(byte[] b) throws IOException; + + void waitForWritesToComplete() throws InterruptedException; } -- GitLab