Commit c615dd1e authored by akwizgran's avatar akwizgran
Browse files

Merge branch 'bluetooth-race' into 'master'

Fix race condition when closing redundant Bluetooth sockets

The Bluetooth invitation code has a race condition: if Alice and Bob connect to each other at roughly the same time, they each consider their outgoing socket to be redundant and close it, resulting in both sockets being closed. This can be triggered pretty reliably by using two phones of the same model and pressing 'Continue' at the same time on both phones.

When more than one invitation socket is opened, Alice should pick which one to use and Bob should use whichever one Alice picks, which Bob can detect by trying to read from both sockets.

Hopefully the Bluetooth invitation code will be retired when #117 is merged, but I'm putting this up for review in case we need to keep Bluetooth as a fallback method.

See merge request !120
parents c8fed348 fe9c3adc
......@@ -19,11 +19,10 @@ import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.properties.TransportProperties;
import org.briarproject.api.system.Clock;
import org.briarproject.util.LatchedReference;
import org.briarproject.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
......@@ -33,9 +32,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import static android.bluetooth.BluetoothAdapter.ACTION_SCAN_MODE_CHANGED;
......@@ -69,7 +71,6 @@ class DroidtoothPlugin implements DuplexPlugin {
private final AndroidExecutor androidExecutor;
private final Context appContext;
private final SecureRandom secureRandom;
private final Clock clock;
private final Backoff backoff;
private final DuplexPluginCallback callback;
private final int maxLatency;
......@@ -83,13 +84,12 @@ class DroidtoothPlugin implements DuplexPlugin {
private volatile BluetoothAdapter adapter = null;
DroidtoothPlugin(Executor ioExecutor, AndroidExecutor androidExecutor,
Context appContext, SecureRandom secureRandom, Clock clock,
Backoff backoff, DuplexPluginCallback callback, int maxLatency) {
Context appContext, SecureRandom secureRandom, Backoff backoff,
DuplexPluginCallback callback, int maxLatency) {
this.ioExecutor = ioExecutor;
this.androidExecutor = androidExecutor;
this.appContext = appContext;
this.secureRandom = secureRandom;
this.clock = clock;
this.backoff = backoff;
this.callback = callback;
this.maxLatency = maxLatency;
......@@ -339,7 +339,7 @@ class DroidtoothPlugin implements DuplexPlugin {
}
public DuplexTransportConnection createInvitationConnection(PseudoRandom r,
long timeout) {
long timeout, boolean alice) {
if (!isRunning()) return null;
// Use the invitation codes to generate the UUID
byte[] b = r.nextBytes(UUID_BYTES);
......@@ -353,23 +353,68 @@ class DroidtoothPlugin implements DuplexPlugin {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null;
}
// Start the background threads
LatchedReference<BluetoothSocket> socketLatch =
new LatchedReference<BluetoothSocket>();
new DiscoveryThread(socketLatch, uuid.toString(), timeout).start();
new BluetoothListenerThread(socketLatch, ss).start();
// Wait for an incoming or outgoing connection
// Create the background tasks
CompletionService<BluetoothSocket> complete =
new ExecutorCompletionService<BluetoothSocket>(ioExecutor);
List<Future<BluetoothSocket>> futures =
new ArrayList<Future<BluetoothSocket>>();
if (alice) {
// Return the first connected socket
futures.add(complete.submit(new ListeningTask(ss)));
futures.add(complete.submit(new DiscoveryTask(uuid.toString())));
} else {
// Return the first socket with readable data
futures.add(complete.submit(new ReadableTask(
new ListeningTask(ss))));
futures.add(complete.submit(new ReadableTask(
new DiscoveryTask(uuid.toString()))));
}
BluetoothSocket chosen = null;
try {
BluetoothSocket s = socketLatch.waitForReference(timeout);
if (s != null) return new DroidtoothTransportConnection(this, s);
Future<BluetoothSocket> f = complete.poll(timeout, MILLISECONDS);
if (f == null) return null; // No task completed within the timeout
chosen = f.get();
return new DroidtoothTransportConnection(this, chosen);
} catch (InterruptedException e) {
LOG.warning("Interrupted while exchanging invitations");
LOG.info("Interrupted while exchanging invitations");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null;
} finally {
// Closing the socket will terminate the listener thread
// Closing the socket will terminate the listener task
tryToClose(ss);
closeSockets(futures, chosen);
}
return null;
}
private void closeSockets(final List<Future<BluetoothSocket>> futures,
final BluetoothSocket chosen) {
ioExecutor.execute(new Runnable() {
public void run() {
for (Future<BluetoothSocket> f : futures) {
try {
if (f.cancel(true)) {
LOG.info("Cancelled task");
} else {
BluetoothSocket s = f.get();
if (s != null && s != chosen) {
LOG.info("Closing unwanted socket");
s.close();
}
}
} catch (InterruptedException e) {
LOG.info("Interrupted while closing sockets");
return;
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}
});
}
private class BluetoothStateReceiver extends BroadcastReceiver {
......@@ -395,61 +440,37 @@ class DroidtoothPlugin implements DuplexPlugin {
}
}
private class DiscoveryThread extends Thread {
private class DiscoveryTask implements Callable<BluetoothSocket> {
private final LatchedReference<BluetoothSocket> socketLatch;
private final String uuid;
private final long timeout;
private DiscoveryThread(LatchedReference<BluetoothSocket> socketLatch,
String uuid, long timeout) {
this.socketLatch = socketLatch;
private DiscoveryTask(String uuid) {
this.uuid = uuid;
this.timeout = timeout;
}
@Override
public void run() {
long end = clock.currentTimeMillis() + timeout;
while (!finished(end)) {
public BluetoothSocket call() throws Exception {
// Repeat discovery until we connect or get interrupted
while (true) {
// Discover nearby devices
LOG.info("Discovering nearby devices");
List<String> addresses;
try {
long now = clock.currentTimeMillis();
addresses = discoverDevices(end - now);
} catch (InterruptedException e) {
LOG.warning("Interrupted while discovering devices");
Thread.currentThread().interrupt();
return;
}
List<String> addresses = discoverDevices();
if (addresses.isEmpty()) {
LOG.info("No devices discovered");
continue;
}
// Connect to any device with the right UUID
for (String address : addresses) {
if (finished(end)) return;
BluetoothSocket s = connect(address, uuid);
if (s != null) {
LOG.info("Outgoing connection");
if (!socketLatch.set(s)) {
LOG.info("Closing redundant connection");
tryToClose(s);
}
return;
return s;
}
}
}
}
private boolean finished(long end) {
long now = clock.currentTimeMillis();
return now >= end || !isRunning() || socketLatch.isSet();
}
private List<String> discoverDevices(long timeout)
throws InterruptedException {
private List<String> discoverDevices() throws InterruptedException {
IntentFilter filter = new IntentFilter();
filter.addAction(FOUND);
filter.addAction(DISCOVERY_FINISHED);
......@@ -457,7 +478,7 @@ class DroidtoothPlugin implements DuplexPlugin {
appContext.registerReceiver(disco, filter);
LOG.info("Starting discovery");
adapter.startDiscovery();
return disco.waitForAddresses(timeout);
return disco.waitForAddresses();
}
}
......@@ -481,38 +502,47 @@ class DroidtoothPlugin implements DuplexPlugin {
}
}
private List<String> waitForAddresses(long timeout)
throws InterruptedException {
finished.await(timeout, MILLISECONDS);
private List<String> waitForAddresses() throws InterruptedException {
finished.await();
Collections.shuffle(addresses);
return Collections.unmodifiableList(addresses);
}
}
private static class BluetoothListenerThread extends Thread {
private static class ListeningTask implements Callable<BluetoothSocket> {
private final LatchedReference<BluetoothSocket> socketLatch;
private final BluetoothServerSocket serverSocket;
private BluetoothListenerThread(
LatchedReference<BluetoothSocket> socketLatch,
BluetoothServerSocket serverSocket) {
this.socketLatch = socketLatch;
private ListeningTask(BluetoothServerSocket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
try {
BluetoothSocket s = serverSocket.accept();
LOG.info("Incoming connection");
if (!socketLatch.set(s)) {
LOG.info("Closing redundant connection");
s.close();
}
} catch (IOException e) {
// This is expected when the socket is closed
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
public BluetoothSocket call() throws IOException {
BluetoothSocket s = serverSocket.accept();
LOG.info("Incoming connection");
return s;
}
}
private static class ReadableTask implements Callable<BluetoothSocket> {
private final Callable<BluetoothSocket> connectionTask;
private ReadableTask(Callable<BluetoothSocket> connectionTask) {
this.connectionTask = connectionTask;
}
@Override
public BluetoothSocket call() throws Exception {
BluetoothSocket s = connectionTask.call();
InputStream in = s.getInputStream();
while (in.available() == 0) {
LOG.info("Waiting for data");
Thread.sleep(1000);
}
LOG.info("Data available");
return s;
}
}
}
......@@ -9,8 +9,6 @@ import org.briarproject.api.plugins.BackoffFactory;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexPluginFactory;
import org.briarproject.api.system.Clock;
import org.briarproject.system.SystemClock;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
......@@ -27,7 +25,6 @@ public class DroidtoothPluginFactory implements DuplexPluginFactory {
private final Context appContext;
private final SecureRandom secureRandom;
private final BackoffFactory backoffFactory;
private final Clock clock;
public DroidtoothPluginFactory(Executor ioExecutor,
AndroidExecutor androidExecutor, Context appContext,
......@@ -37,7 +34,6 @@ public class DroidtoothPluginFactory implements DuplexPluginFactory {
this.appContext = appContext;
this.secureRandom = secureRandom;
this.backoffFactory = backoffFactory;
clock = new SystemClock();
}
public TransportId getId() {
......@@ -48,6 +44,6 @@ public class DroidtoothPluginFactory implements DuplexPluginFactory {
Backoff backoff = backoffFactory.createBackoff(MIN_POLLING_INTERVAL,
MAX_POLLING_INTERVAL, BACKOFF_BASE);
return new DroidtoothPlugin(ioExecutor, androidExecutor, appContext,
secureRandom, clock, backoff, callback, MAX_LATENCY);
secureRandom, backoff, callback, MAX_LATENCY);
}
}
......@@ -565,7 +565,7 @@ class TorPlugin implements DuplexPlugin, EventHandler,
}
public DuplexTransportConnection createInvitationConnection(PseudoRandom r,
long timeout) {
long timeout, boolean alice) {
throw new UnsupportedOperationException();
}
......
......@@ -23,5 +23,5 @@ public interface DuplexPlugin extends Plugin {
* time.
*/
DuplexTransportConnection createInvitationConnection(PseudoRandom r,
long timeout);
long timeout, boolean alice);
}
......@@ -51,7 +51,7 @@ class AliceConnector extends Connector {
@Override
public void run() {
// Create an incoming or outgoing connection
DuplexTransportConnection conn = createInvitationConnection();
DuplexTransportConnection conn = createInvitationConnection(true);
if (conn == null) return;
if (LOG.isLoggable(INFO)) LOG.info(pluginName + " connected");
// Don't proceed with more than one connection
......
......@@ -51,7 +51,7 @@ class BobConnector extends Connector {
@Override
public void run() {
// Create an incoming or outgoing connection
DuplexTransportConnection conn = createInvitationConnection();
DuplexTransportConnection conn = createInvitationConnection(false);
if (conn == null) return;
if (LOG.isLoggable(INFO)) LOG.info(pluginName + " connected");
// Carry out the key agreement protocol
......
......@@ -93,10 +93,12 @@ abstract class Connector extends Thread {
messageDigest = crypto.getMessageDigest();
}
protected DuplexTransportConnection createInvitationConnection() {
protected DuplexTransportConnection createInvitationConnection(
boolean alice) {
if (LOG.isLoggable(INFO))
LOG.info(pluginName + " creating invitation connection");
return plugin.createInvitationConnection(random, CONNECTION_TIMEOUT);
return plugin.createInvitationConnection(random, CONNECTION_TIMEOUT,
alice);
}
protected void sendPublicKeyHash(BdfWriter w) throws IOException {
......
......@@ -246,7 +246,7 @@ abstract class TcpPlugin implements DuplexPlugin {
}
public DuplexTransportConnection createInvitationConnection(PseudoRandom r,
long timeout) {
long timeout, boolean alice) {
throw new UnsupportedOperationException();
}
......
package org.briarproject.util;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public class LatchedReference<T> {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<T> reference = new AtomicReference<T>();
public boolean isSet() {
return reference.get() != null;
}
public boolean set(T t) {
if (t == null) throw new IllegalArgumentException();
if (reference.compareAndSet(null, t)) {
latch.countDown();
return true;
}
return false;
}
public T waitForReference(long timeout) throws InterruptedException {
latch.await(timeout, MILLISECONDS);
return reference.get();
}
}
......@@ -8,18 +8,24 @@ import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexPluginCallback;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.properties.TransportProperties;
import org.briarproject.api.system.Clock;
import org.briarproject.util.LatchedReference;
import org.briarproject.util.OsUtils;
import org.briarproject.util.StringUtils;
import java.io.IOException;
import java.io.InputStream;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.logging.Logger;
......@@ -30,6 +36,7 @@ import javax.microedition.io.Connector;
import javax.microedition.io.StreamConnection;
import javax.microedition.io.StreamConnectionNotifier;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static javax.bluetooth.DiscoveryAgent.GIAC;
......@@ -45,7 +52,6 @@ class BluetoothPlugin implements DuplexPlugin {
private final Executor ioExecutor;
private final SecureRandom secureRandom;
private final Clock clock;
private final Backoff backoff;
private final DuplexPluginCallback callback;
private final int maxLatency;
......@@ -55,11 +61,10 @@ class BluetoothPlugin implements DuplexPlugin {
private volatile StreamConnectionNotifier socket = null;
private volatile LocalDevice localDevice = null;
BluetoothPlugin(Executor ioExecutor, SecureRandom secureRandom, Clock clock,
BluetoothPlugin(Executor ioExecutor, SecureRandom secureRandom,
Backoff backoff, DuplexPluginCallback callback, int maxLatency) {
this.ioExecutor = ioExecutor;
this.secureRandom = secureRandom;
this.clock = clock;
this.backoff = backoff;
this.callback = callback;
this.maxLatency = maxLatency;
......@@ -246,7 +251,7 @@ class BluetoothPlugin implements DuplexPlugin {
}
public DuplexTransportConnection createInvitationConnection(PseudoRandom r,
long timeout) {
long timeout, boolean alice) {
if (!running) return null;
// Use the invitation codes to generate the UUID
byte[] b = r.nextBytes(UUID_BYTES);
......@@ -266,23 +271,68 @@ class BluetoothPlugin implements DuplexPlugin {
tryToClose(ss);
return null;
}
// Start the background threads
LatchedReference<StreamConnection> socketLatch =
new LatchedReference<StreamConnection>();
new DiscoveryThread(socketLatch, uuid, timeout).start();
new BluetoothListenerThread(socketLatch, ss).start();
// Wait for an incoming or outgoing connection
// Create the background tasks
CompletionService<StreamConnection> complete =
new ExecutorCompletionService<StreamConnection>(ioExecutor);
List<Future<StreamConnection>> futures =
new ArrayList<Future<StreamConnection>>();
if (alice) {
// Return the first connected socket
futures.add(complete.submit(new ListeningTask(ss)));
futures.add(complete.submit(new DiscoveryTask(uuid)));
} else {
// Return the first socket with readable data
futures.add(complete.submit(new ReadableTask(
new ListeningTask(ss))));
futures.add(complete.submit(new ReadableTask(
new DiscoveryTask(uuid))));
}
StreamConnection chosen = null;
try {
StreamConnection s = socketLatch.waitForReference(timeout);
if (s != null) return new BluetoothTransportConnection(this, s);
Future<StreamConnection> f = complete.poll(timeout, MILLISECONDS);
if (f == null) return null; // No task completed within the timeout
chosen = f.get();
return new BluetoothTransportConnection(this, chosen);
} catch (InterruptedException e) {
LOG.warning("Interrupted while exchanging invitations");
LOG.info("Interrupted while exchanging invitations");
Thread.currentThread().interrupt();
return null;
} catch (ExecutionException e) {
if (LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e);
return null;
} finally {
// Closing the socket will terminate the listener thread
// Closing the socket will terminate the listener task
tryToClose(ss);
closeSockets(futures, chosen);
}
return null;
}
private void closeSockets(final List<Future<StreamConnection>> futures,
final StreamConnection chosen) {
ioExecutor.execute(new Runnable() {
public void run() {
for (Future<StreamConnection> f : futures) {
try {
if (f.cancel(true)) {
LOG.info("Cancelled task");
} else {
StreamConnection s = f.get();
if (s != null && s != chosen) {
LOG.info("Closing unwanted socket");
s.close();
}
}
} catch (InterruptedException e) {
LOG.info("Interrupted while closing sockets");
return;
} catch (ExecutionException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
} catch (IOException e) {
if (LOG.isLoggable(INFO)) LOG.info(e.toString());
}
}
}
});
}
private void makeDeviceDiscoverable() {
......@@ -294,93 +344,74 @@ class BluetoothPlugin implements DuplexPlugin {
}
}
private class DiscoveryThread extends Thread {
private class DiscoveryTask implements Callable<StreamConnection> {
private final LatchedReference<StreamConnection> socketLatch;
private final String uuid;
private final long timeout;
private DiscoveryThread(LatchedReference<StreamConnection> socketLatch,
String uuid, long timeout) {
this.socketLatch = socketLatch;
private DiscoveryTask(String uuid) {
this.uuid = uuid;
this.timeout = timeout;
}
@Override
public void run() {
public StreamConnection call() throws Exception {
// Repeat discovery until we connect or get interrupted