diff --git a/briar-core/src/org/briarproject/plugins/PluginsModule.java b/briar-core/src/org/briarproject/plugins/PluginsModule.java index 2610b3a932ffa2ee5bd879c7342e84e200ee234c..fc58bc40457002f6d1d9b21fa36d002d360cc96a 100644 --- a/briar-core/src/org/briarproject/plugins/PluginsModule.java +++ b/briar-core/src/org/briarproject/plugins/PluginsModule.java @@ -7,6 +7,7 @@ import org.briarproject.api.plugins.BackoffFactory; import org.briarproject.api.plugins.ConnectionManager; import org.briarproject.api.plugins.ConnectionRegistry; import org.briarproject.api.plugins.PluginManager; +import org.briarproject.api.system.Clock; import java.security.SecureRandom; import java.util.concurrent.Executor; @@ -39,9 +40,9 @@ public class PluginsModule { ScheduledExecutorService scheduler, ConnectionManager connectionManager, ConnectionRegistry connectionRegistry, PluginManager pluginManager, - SecureRandom random, EventBus eventBus) { + SecureRandom random, Clock clock, EventBus eventBus) { Poller poller = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, random); + connectionRegistry, pluginManager, random, clock); eventBus.addListener(poller); return poller; } diff --git a/briar-core/src/org/briarproject/plugins/Poller.java b/briar-core/src/org/briarproject/plugins/Poller.java index 557e7ad09702ee70fdbc3351a85d85dd0ac55e78..57f19a3544c8132408baf33896bca31e7670bbc7 100644 --- a/briar-core/src/org/briarproject/plugins/Poller.java +++ b/briar-core/src/org/briarproject/plugins/Poller.java @@ -3,6 +3,7 @@ package org.briarproject.plugins; import org.briarproject.api.TransportId; import org.briarproject.api.contact.ContactId; import org.briarproject.api.event.ConnectionClosedEvent; +import org.briarproject.api.event.ConnectionOpenedEvent; import org.briarproject.api.event.ContactStatusChangedEvent; import org.briarproject.api.event.Event; import org.briarproject.api.event.EventListener; @@ -16,12 +17,15 @@ import org.briarproject.api.plugins.TransportConnectionWriter; import org.briarproject.api.plugins.duplex.DuplexPlugin; import org.briarproject.api.plugins.duplex.DuplexTransportConnection; import org.briarproject.api.plugins.simplex.SimplexPlugin; +import org.briarproject.api.system.Clock; import java.security.SecureRandom; +import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; import javax.inject.Inject; @@ -39,20 +43,24 @@ class Poller implements EventListener { private final ConnectionRegistry connectionRegistry; private final PluginManager pluginManager; private final SecureRandom random; - private final Map<TransportId, PollTask> tasks; + private final Clock clock; + private final Lock lock; + private final Map<TransportId, PollTask> tasks; // Locking: lock @Inject Poller(@IoExecutor Executor ioExecutor, ScheduledExecutorService scheduler, ConnectionManager connectionManager, ConnectionRegistry connectionRegistry, PluginManager pluginManager, - SecureRandom random) { + SecureRandom random, Clock clock) { this.ioExecutor = ioExecutor; this.connectionManager = connectionManager; this.connectionRegistry = connectionRegistry; this.pluginManager = pluginManager; this.random = random; this.scheduler = scheduler; - tasks = new ConcurrentHashMap<TransportId, PollTask>(); + this.clock = clock; + lock = new ReentrantLock(); + tasks = new HashMap<TransportId, PollTask>(); } @Override @@ -65,12 +73,19 @@ class Poller implements EventListener { } } else if (e instanceof ConnectionClosedEvent) { ConnectionClosedEvent c = (ConnectionClosedEvent) e; + // Reschedule polling, the polling interval may have decreased + reschedule(c.getTransportId()); if (!c.isIncoming()) { // Connect to the disconnected contact connectToContact(c.getContactId(), c.getTransportId()); } + } else if (e instanceof ConnectionOpenedEvent) { + ConnectionOpenedEvent c = (ConnectionOpenedEvent) e; + // Reschedule polling, the polling interval may have decreased + reschedule(c.getTransportId()); } else if (e instanceof TransportEnabledEvent) { TransportEnabledEvent t = (TransportEnabledEvent) e; + // Poll the newly enabled transport pollNow(t.getTransportId()); } } @@ -118,18 +133,32 @@ class Poller implements EventListener { }); } + private void reschedule(TransportId t) { + Plugin p = pluginManager.getPlugin(t); + if (p.shouldPoll()) schedule(p, p.getPollingInterval(), false); + } + private void pollNow(TransportId t) { Plugin p = pluginManager.getPlugin(t); // Randomise next polling interval if (p.shouldPoll()) schedule(p, 0, true); } - private void schedule(Plugin p, int interval, boolean randomiseNext) { - // Replace any previously scheduled task for this plugin - PollTask task = new PollTask(p, randomiseNext); - PollTask replaced = tasks.put(p.getId(), task); - if (replaced != null) replaced.cancel(); - scheduler.schedule(task, interval, MILLISECONDS); + private void schedule(Plugin p, int delay, boolean randomiseNext) { + // Replace any later scheduled task for this plugin + long due = clock.currentTimeMillis() + delay; + lock.lock(); + try { + TransportId t = p.getId(); + PollTask scheduled = tasks.get(t); + if (scheduled == null || due < scheduled.due) { + PollTask task = new PollTask(p, due, randomiseNext); + tasks.put(t, task); + scheduler.schedule(task, delay, MILLISECONDS); + } + } finally { + lock.unlock(); + } } private void poll(final Plugin p) { @@ -146,27 +175,28 @@ class Poller implements EventListener { private class PollTask implements Runnable { private final Plugin plugin; + private final long due; private final boolean randomiseNext; - private volatile boolean cancelled = false; - - private PollTask(Plugin plugin, boolean randomiseNext) { + private PollTask(Plugin plugin, long due, boolean randomiseNext) { this.plugin = plugin; + this.due = due; this.randomiseNext = randomiseNext; } - private void cancel() { - cancelled = true; - } - @Override public void run() { - if (cancelled) return; - tasks.remove(plugin.getId()); - int interval = plugin.getPollingInterval(); - if (randomiseNext) - interval = (int) (interval * random.nextDouble()); - schedule(plugin, interval, false); + lock.lock(); + try { + TransportId t = plugin.getId(); + if (tasks.get(t) != this) return; // Replaced by another task + tasks.remove(t); + } finally { + lock.unlock(); + } + int delay = plugin.getPollingInterval(); + if (randomiseNext) delay = (int) (delay * random.nextDouble()); + schedule(plugin, delay, false); poll(plugin); } } diff --git a/briar-tests/src/org/briarproject/plugins/PollerTest.java b/briar-tests/src/org/briarproject/plugins/PollerTest.java index 69af841d5048a1448e43ddda61c1ca64bd06af62..5531177d1b95c7e0f5ef7db678b3c0d295e611b9 100644 --- a/briar-tests/src/org/briarproject/plugins/PollerTest.java +++ b/briar-tests/src/org/briarproject/plugins/PollerTest.java @@ -6,6 +6,7 @@ import org.briarproject.RunAction; import org.briarproject.api.TransportId; import org.briarproject.api.contact.ContactId; import org.briarproject.api.event.ConnectionClosedEvent; +import org.briarproject.api.event.ConnectionOpenedEvent; import org.briarproject.api.event.ContactStatusChangedEvent; import org.briarproject.api.event.TransportEnabledEvent; import org.briarproject.api.plugins.ConnectionManager; @@ -16,6 +17,7 @@ import org.briarproject.api.plugins.TransportConnectionWriter; import org.briarproject.api.plugins.duplex.DuplexPlugin; import org.briarproject.api.plugins.duplex.DuplexTransportConnection; import org.briarproject.api.plugins.simplex.SimplexPlugin; +import org.briarproject.api.system.Clock; import org.jmock.Expectations; import org.jmock.Mockery; import org.jmock.lib.legacy.ClassImposteriser; @@ -33,9 +35,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; public class PollerTest extends BriarTestCase { private final ContactId contactId = new ContactId(234); + private final int pollingInterval = 60 * 1000; + private final long now = System.currentTimeMillis(); @Test - public void testConnectToNewContact() throws Exception { + public void testConnectOnContactStatusChanged() throws Exception { Mockery context = new Mockery(); context.setImposteriser(ClassImposteriser.INSTANCE); final Executor ioExecutor = new ImmediateExecutor(); @@ -47,6 +51,7 @@ public class PollerTest extends BriarTestCase { context.mock(ConnectionRegistry.class); final PluginManager pluginManager = context.mock(PluginManager.class); final SecureRandom random = context.mock(SecureRandom.class); + final Clock clock = context.mock(Clock.class); // Two simplex plugins: one supports polling, the other doesn't final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class); @@ -112,7 +117,7 @@ public class PollerTest extends BriarTestCase { }}); Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, random); + connectionRegistry, pluginManager, random, clock); p.eventOccurred(new ContactStatusChangedEvent(contactId, true)); @@ -120,7 +125,8 @@ public class PollerTest extends BriarTestCase { } @Test - public void testReconnectToDisconnectedContact() throws Exception { + public void testRescheduleAndReconnectOnConnectionClosed() + throws Exception { Mockery context = new Mockery(); context.setImposteriser(ClassImposteriser.INSTANCE); final Executor ioExecutor = new ImmediateExecutor(); @@ -132,6 +138,7 @@ public class PollerTest extends BriarTestCase { context.mock(ConnectionRegistry.class); final PluginManager pluginManager = context.mock(PluginManager.class); final SecureRandom random = context.mock(SecureRandom.class); + final Clock clock = context.mock(Clock.class); final DuplexPlugin plugin = context.mock(DuplexPlugin.class); final TransportId transportId = new TransportId("id"); @@ -139,15 +146,30 @@ public class PollerTest extends BriarTestCase { context.mock(DuplexTransportConnection.class); context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + // reschedule() + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); // Get the plugin oneOf(pluginManager).getPlugin(transportId); will(returnValue(plugin)); // The plugin supports polling oneOf(plugin).shouldPoll(); will(returnValue(true)); + // Schedule the next poll + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(scheduler).schedule(with(any(Runnable.class)), + with((long) pollingInterval), with(MILLISECONDS)); + // connectToContact() // Check whether the contact is already connected - oneOf(plugin).getId(); - will(returnValue(transportId)); oneOf(connectionRegistry).isConnected(contactId, transportId); will(returnValue(false)); // Connect to the contact @@ -159,7 +181,7 @@ public class PollerTest extends BriarTestCase { }}); Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, random); + connectionRegistry, pluginManager, random, clock); p.eventOccurred(new ConnectionClosedEvent(contactId, transportId, false)); @@ -167,8 +189,114 @@ public class PollerTest extends BriarTestCase { context.assertIsSatisfied(); } + + @Test + public void testRescheduleOnConnectionOpened() throws Exception { + Mockery context = new Mockery(); + context.setImposteriser(ClassImposteriser.INSTANCE); + final Executor ioExecutor = new ImmediateExecutor(); + final ScheduledExecutorService scheduler = + context.mock(ScheduledExecutorService.class); + final ConnectionManager connectionManager = + context.mock(ConnectionManager.class); + final ConnectionRegistry connectionRegistry = + context.mock(ConnectionRegistry.class); + final PluginManager pluginManager = context.mock(PluginManager.class); + final SecureRandom random = context.mock(SecureRandom.class); + final Clock clock = context.mock(Clock.class); + + final DuplexPlugin plugin = context.mock(DuplexPlugin.class); + final TransportId transportId = new TransportId("id"); + + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Schedule the next poll + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(scheduler).schedule(with(any(Runnable.class)), + with((long) pollingInterval), with(MILLISECONDS)); + }}); + + Poller p = new Poller(ioExecutor, scheduler, connectionManager, + connectionRegistry, pluginManager, random, clock); + + p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + false)); + + context.assertIsSatisfied(); + } + + @Test + public void testRescheduleDoesNotReplaceEarlierTask() throws Exception { + Mockery context = new Mockery(); + context.setImposteriser(ClassImposteriser.INSTANCE); + final Executor ioExecutor = new ImmediateExecutor(); + final ScheduledExecutorService scheduler = + context.mock(ScheduledExecutorService.class); + final ConnectionManager connectionManager = + context.mock(ConnectionManager.class); + final ConnectionRegistry connectionRegistry = + context.mock(ConnectionRegistry.class); + final PluginManager pluginManager = context.mock(PluginManager.class); + final SecureRandom random = context.mock(SecureRandom.class); + final Clock clock = context.mock(Clock.class); + + final DuplexPlugin plugin = context.mock(DuplexPlugin.class); + final TransportId transportId = new TransportId("id"); + + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + // First event + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Schedule the next poll + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(scheduler).schedule(with(any(Runnable.class)), + with((long) pollingInterval), with(MILLISECONDS)); + // Second event + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Don't replace the previously scheduled task, due earlier + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now + 1)); + }}); + + Poller p = new Poller(ioExecutor, scheduler, connectionManager, + connectionRegistry, pluginManager, random, clock); + + p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + false)); + p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + false)); + + context.assertIsSatisfied(); + } + @Test - public void testPollWhenTransportIsEnabled() throws Exception { + public void testPollOnTransportEnabled() throws Exception { Mockery context = new Mockery(); context.setImposteriser(ClassImposteriser.INSTANCE); final Executor ioExecutor = new ImmediateExecutor(); @@ -180,10 +308,10 @@ public class PollerTest extends BriarTestCase { context.mock(ConnectionRegistry.class); final PluginManager pluginManager = context.mock(PluginManager.class); final SecureRandom random = context.mock(SecureRandom.class); + final Clock clock = context.mock(Clock.class); final Plugin plugin = context.mock(Plugin.class); final TransportId transportId = new TransportId("id"); - final int pollingInterval = 60 * 1000; final List<ContactId> connected = Collections.singletonList(contactId); context.checking(new Expectations() {{ @@ -196,14 +324,18 @@ public class PollerTest extends BriarTestCase { oneOf(plugin).shouldPoll(); will(returnValue(true)); // Schedule a polling task immediately + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L), with(MILLISECONDS)); will(new RunAction()); - // Run the polling task + // Running the polling task schedules the next polling task oneOf(plugin).getPollingInterval(); will(returnValue(pollingInterval)); oneOf(random).nextDouble(); will(returnValue(0.5)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with((long) (pollingInterval * 0.5)), with(MILLISECONDS)); // Poll the plugin @@ -213,7 +345,7 @@ public class PollerTest extends BriarTestCase { }}); Poller p = new Poller(ioExecutor, scheduler, connectionManager, - connectionRegistry, pluginManager, random); + connectionRegistry, pluginManager, random, clock); p.eventOccurred(new TransportEnabledEvent(transportId));