diff --git a/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java b/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java index 4b2d7d80e7cdc6f7e95a77be4f3320177ab7468d..4614b6b54820a8fb4619f7e4d259faaa1a84a890 100644 --- a/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java +++ b/components/net/sf/briar/plugins/bluetooth/BluetoothPlugin.java @@ -126,7 +126,11 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } socket = scn; } - startContactAccepterThread(); + pluginExecutor.execute(new Runnable() { + public void run() { + acceptContactConnections(); + } + }); } private synchronized String getUuid() { @@ -160,15 +164,6 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } - private void startContactAccepterThread() { - new Thread() { - @Override - public void run() { - acceptContactConnections(); - } - }.start(); - } - private void acceptContactConnections() { while(true) { StreamConnectionNotifier scn; @@ -299,9 +294,17 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { // The invitee's device may not be discoverable, so both parties must // try to initiate connections String uuid = convertInvitationCodeToUuid(code); - ConnectionCallback c = new ConnectionCallback(uuid, timeout); - startOutgoingInvitationThread(c); - startIncomingInvitationThread(c); + final ConnectionCallback c = new ConnectionCallback(uuid, timeout); + pluginExecutor.execute(new Runnable() { + public void run() { + createInvitationConnection(c); + } + }); + pluginExecutor.execute(new Runnable() { + public void run() { + bindInvitationSocket(c); + } + }); try { StreamConnection s = c.waitForConnection(); return s == null ? null : new BluetoothTransportConnection(s); @@ -319,15 +322,6 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { return StringUtils.toHexString(b); } - private void startOutgoingInvitationThread(final ConnectionCallback c) { - new Thread() { - @Override - public void run() { - createInvitationConnection(c); - } - }.start(); - } - private void createInvitationConnection(ConnectionCallback c) { DiscoveryAgent discoveryAgent; synchronized(this) { @@ -369,30 +363,25 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } - private void startIncomingInvitationThread(final ConnectionCallback c) { - new Thread() { - @Override - public void run() { - bindInvitationSocket(c); - } - }.start(); - } - - private void bindInvitationSocket(ConnectionCallback c) { + private void bindInvitationSocket(final ConnectionCallback c) { synchronized(this) { if(!started) return; makeDeviceDiscoverable(); } // Bind the socket String url = "btspp://localhost:" + c.getUuid() + ";name=RFCOMM"; - StreamConnectionNotifier scn; + final StreamConnectionNotifier scn; try { scn = (StreamConnectionNotifier) Connector.open(url); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); return; } - startInvitationAccepterThread(c, scn); + pluginExecutor.execute(new Runnable() { + public void run() { + acceptInvitationConnection(c, scn); + } + }); // Close the socket when the invitation times out try { Thread.sleep(c.getTimeout()); @@ -408,16 +397,6 @@ class BluetoothPlugin extends AbstractPlugin implements StreamPlugin { } } - private void startInvitationAccepterThread(final ConnectionCallback c, - final StreamConnectionNotifier scn) { - new Thread() { - @Override - public void run() { - acceptInvitationConnection(c, scn); - } - }.start(); - } - private void acceptInvitationConnection(ConnectionCallback c, StreamConnectionNotifier scn) { synchronized(this) { diff --git a/components/net/sf/briar/plugins/file/PollingRemovableDriveMonitor.java b/components/net/sf/briar/plugins/file/PollingRemovableDriveMonitor.java index 05fd5a057f49b9a9e1b8b174ef8b537cd3bbb763..581010e88e5efddcc36b761296e9398dfc205e8e 100644 --- a/components/net/sf/briar/plugins/file/PollingRemovableDriveMonitor.java +++ b/components/net/sf/briar/plugins/file/PollingRemovableDriveMonitor.java @@ -3,14 +3,18 @@ package net.sf.briar.plugins.file; import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; +import net.sf.briar.api.plugins.PluginExecutor; + class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable { private static final Logger LOG = Logger.getLogger(PollingRemovableDriveMonitor.class.getName()); + private final Executor pluginExecutor; private final RemovableDriveFinder finder; private final long pollingInterval; private final Object pollingLock = new Object(); @@ -19,8 +23,9 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable { private volatile Callback callback = null; private volatile IOException exception = null; - public PollingRemovableDriveMonitor(RemovableDriveFinder finder, - long pollingInterval) { + public PollingRemovableDriveMonitor(@PluginExecutor Executor pluginExecutor, + RemovableDriveFinder finder, long pollingInterval) { + this.pluginExecutor = pluginExecutor; this.finder = finder; this.pollingInterval = pollingInterval; } @@ -29,7 +34,7 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable { if(running) throw new IllegalStateException(); running = true; this.callback = callback; - new Thread(this).start(); + pluginExecutor.execute(this); } public synchronized void stop() throws IOException { @@ -50,14 +55,7 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable { Collection<File> drives = finder.findRemovableDrives(); while(running) { synchronized(pollingLock) { - try { - pollingLock.wait(pollingInterval); - } catch(InterruptedException e) { - if(LOG.isLoggable(Level.INFO)) - LOG.info("Interrupted while waiting to poll"); - Thread.currentThread().interrupt(); - return; - } + pollingLock.wait(pollingInterval); } if(!running) return; Collection<File> newDrives = finder.findRemovableDrives(); @@ -66,6 +64,10 @@ class PollingRemovableDriveMonitor implements RemovableDriveMonitor, Runnable { } drives = newDrives; } + } catch(InterruptedException e) { + if(LOG.isLoggable(Level.INFO)) + LOG.info("Interrupted while waiting to poll"); + Thread.currentThread().interrupt(); } catch(IOException e) { exception = e; } diff --git a/components/net/sf/briar/plugins/file/RemovableDrivePluginFactory.java b/components/net/sf/briar/plugins/file/RemovableDrivePluginFactory.java index c8d906d689b433ca471bb6614dc16104bd7fec0c..81f719c8dcf5454d4aaf0e73b772b9ae2233ba84 100644 --- a/components/net/sf/briar/plugins/file/RemovableDrivePluginFactory.java +++ b/components/net/sf/briar/plugins/file/RemovableDrivePluginFactory.java @@ -25,11 +25,11 @@ public class RemovableDrivePluginFactory implements BatchPluginFactory { } else if(OsUtils.isMac()) { // JNotify requires OS X 10.5 or newer, so we have to poll finder = new MacRemovableDriveFinder(); - monitor = new PollingRemovableDriveMonitor(finder, + monitor = new PollingRemovableDriveMonitor(pluginExecutor, finder, POLLING_INTERVAL); } else if(OsUtils.isWindows()) { finder = new WindowsRemovableDriveFinder(); - monitor = new PollingRemovableDriveMonitor(finder, + monitor = new PollingRemovableDriveMonitor(pluginExecutor, finder, POLLING_INTERVAL); } else { return null; diff --git a/components/net/sf/briar/plugins/socket/SocketPlugin.java b/components/net/sf/briar/plugins/socket/SocketPlugin.java index c5863ebfa2eea42f9591c93841c5db72b42420f9..e01ef0cf82e77855cb6521914bfeb36034ced3f2 100644 --- a/components/net/sf/briar/plugins/socket/SocketPlugin.java +++ b/components/net/sf/briar/plugins/socket/SocketPlugin.java @@ -42,15 +42,11 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { @Override public synchronized void start() throws IOException { super.start(); - pluginExecutor.execute(createBinder()); - } - - private Runnable createBinder() { - return new Runnable() { + pluginExecutor.execute(new Runnable() { public void run() { bind(); } - }; + }); } private void bind() { @@ -93,25 +89,11 @@ abstract class SocketPlugin extends AbstractPlugin implements StreamPlugin { socket = ss; setLocalSocketAddress(ss.getLocalSocketAddress()); } - startListenerThread(); - } - - private void startListenerThread() { - new Thread() { - @Override - public void run() { - listen(); - } - }.start(); - } - - private void listen() { + // Accept connections until the socket is closed while(true) { - ServerSocket ss; Socket s; synchronized(this) { if(!started) return; - ss = socket; } try { s = ss.accept(); diff --git a/test/net/sf/briar/plugins/PluginManagerImplTest.java b/test/net/sf/briar/plugins/PluginManagerImplTest.java index 276f16236a9019253c39538350a8d0dcf64c3822..47902f614e908bed30ea5086b511a9d184df34d9 100644 --- a/test/net/sf/briar/plugins/PluginManagerImplTest.java +++ b/test/net/sf/briar/plugins/PluginManagerImplTest.java @@ -1,6 +1,7 @@ package net.sf.briar.plugins; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.TestCase; @@ -37,7 +38,7 @@ public class PluginManagerImplTest extends TestCase { allowing(db).setLocalProperties(with(any(TransportId.class)), with(any(TransportProperties.class))); }}); - Executor executor = new ImmediateExecutor(); + Executor executor = Executors.newCachedThreadPool(); Poller poller = new PollerImpl(); PluginManagerImpl p = new PluginManagerImpl(db, executor, poller, dispatcher, uiCallback); diff --git a/test/net/sf/briar/plugins/file/PollingRemovableDriveMonitorTest.java b/test/net/sf/briar/plugins/file/PollingRemovableDriveMonitorTest.java index 1d64cbc5c9eb2c0a1bf348122ccc8f6522c91ec0..2d77dd4cc91f3739fd9df68a812bc303dde6b1e5 100644 --- a/test/net/sf/briar/plugins/file/PollingRemovableDriveMonitorTest.java +++ b/test/net/sf/briar/plugins/file/PollingRemovableDriveMonitorTest.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import junit.framework.TestCase; @@ -46,8 +47,8 @@ public class PollingRemovableDriveMonitorTest extends TestCase { } }; // Create the monitor and start it - final RemovableDriveMonitor monitor = - new PollingRemovableDriveMonitor(finder, 10); + final RemovableDriveMonitor monitor = new PollingRemovableDriveMonitor( + Executors.newCachedThreadPool(), finder, 10); monitor.start(callback); // Wait for the monitor to detect the files assertTrue(latch.await(1, TimeUnit.SECONDS)); @@ -74,8 +75,8 @@ public class PollingRemovableDriveMonitorTest extends TestCase { will(throwException(new IOException())); }}); // Create the monitor, start it, and give it some time to run - final RemovableDriveMonitor monitor = - new PollingRemovableDriveMonitor(finder, 10); + final RemovableDriveMonitor monitor = new PollingRemovableDriveMonitor( + Executors.newCachedThreadPool(), finder, 10); monitor.start(null); Thread.sleep(50); // The monitor should rethrow the exception when it stops diff --git a/test/net/sf/briar/plugins/socket/SimpleSocketPluginTest.java b/test/net/sf/briar/plugins/socket/SimpleSocketPluginTest.java index 4bc4a0bdd124eb2ba04ec72d7e5e98e07f7677b4..6cad2cdf6d19963273d3b4b074050d0089cc92ef 100644 --- a/test/net/sf/briar/plugins/socket/SimpleSocketPluginTest.java +++ b/test/net/sf/briar/plugins/socket/SimpleSocketPluginTest.java @@ -7,6 +7,7 @@ import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -16,7 +17,6 @@ import net.sf.briar.api.TransportConfig; import net.sf.briar.api.TransportProperties; import net.sf.briar.api.plugins.StreamPluginCallback; import net.sf.briar.api.transport.StreamTransportConnection; -import net.sf.briar.plugins.ImmediateExecutor; import org.junit.Test; @@ -29,10 +29,11 @@ public class SimpleSocketPluginTest extends TestCase { StreamCallback callback = new StreamCallback(); callback.local.put("internal", "127.0.0.1"); callback.local.put("port", "0"); - SimpleSocketPlugin plugin = - new SimpleSocketPlugin(new ImmediateExecutor(), callback, 0L); + SimpleSocketPlugin plugin = new SimpleSocketPlugin( + Executors.newCachedThreadPool(), callback, 0L); plugin.start(); // The plugin should have bound a socket and stored the port number + callback.latch.await(1, TimeUnit.SECONDS); String host = callback.local.get("internal"); assertNotNull(host); assertEquals("127.0.0.1", host); @@ -62,8 +63,8 @@ public class SimpleSocketPluginTest extends TestCase { @Test public void testOutgoingConnection() throws Exception { StreamCallback callback = new StreamCallback(); - SimpleSocketPlugin plugin = - new SimpleSocketPlugin(new ImmediateExecutor(), callback, 0L); + SimpleSocketPlugin plugin = new SimpleSocketPlugin( + Executors.newCachedThreadPool(), callback, 0L); plugin.start(); // Listen on a local port final ServerSocket ss = new ServerSocket(); @@ -101,10 +102,12 @@ public class SimpleSocketPluginTest extends TestCase { private static class StreamCallback implements StreamPluginCallback { - private TransportConfig config = new TransportConfig(); - private TransportProperties local = new TransportProperties(); private final Map<ContactId, TransportProperties> remote = new HashMap<ContactId, TransportProperties>(); + private final CountDownLatch latch = new CountDownLatch(1); + + private TransportConfig config = new TransportConfig(); + private TransportProperties local = new TransportProperties(); private int incomingConnections = 0; @@ -125,6 +128,7 @@ public class SimpleSocketPluginTest extends TestCase { } public void setLocalProperties(TransportProperties p) { + latch.countDown(); local = p; }