diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java index 2405c0be83375ca5e815c853893033f08e4e4ca0..61a5c6137aa492c292b6c41d3256f1ce91ffbfcf 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/Poller.java @@ -16,6 +16,7 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; +import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent; import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent; import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import org.briarproject.bramble.api.system.Clock; @@ -25,6 +26,7 @@ import java.security.SecureRandom; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -50,7 +52,7 @@ class Poller implements EventListener { private final SecureRandom random; private final Clock clock; private final Lock lock; - private final Map<TransportId, PollTask> tasks; // Locking: lock + private final Map<TransportId, ScheduledPollTask> tasks; // Locking: lock @Inject Poller(@IoExecutor Executor ioExecutor, @@ -93,6 +95,10 @@ class Poller implements EventListener { TransportEnabledEvent t = (TransportEnabledEvent) e; // Poll the newly enabled transport pollNow(t.getTransportId()); + } else if (e instanceof TransportDisabledEvent) { + TransportDisabledEvent t = (TransportDisabledEvent) e; + // Cancel polling for the disabled transport + cancel(t.getTransportId()); } } @@ -151,18 +157,31 @@ class Poller implements EventListener { TransportId t = p.getId(); lock.lock(); try { - PollTask scheduled = tasks.get(t); - if (scheduled == null || due < scheduled.due) { + ScheduledPollTask scheduled = tasks.get(t); + if (scheduled == null || due < scheduled.task.due) { + // If a later task exists, cancel it. If it's already started + // it will abort safely when it finds it's been replaced + if (scheduled != null) scheduled.future.cancel(false); PollTask task = new PollTask(p, due, randomiseNext); - tasks.put(t, task); - scheduler.schedule( + Future future = scheduler.schedule( () -> ioExecutor.execute(task), delay, MILLISECONDS); + tasks.put(t, new ScheduledPollTask(task, future)); } } finally { lock.unlock(); } } + private void cancel(TransportId t) { + lock.lock(); + try { + ScheduledPollTask scheduled = tasks.remove(t); + if (scheduled != null) scheduled.future.cancel(false); + } finally { + lock.unlock(); + } + } + @IoExecutor private void poll(Plugin p) { TransportId t = p.getId(); @@ -170,6 +189,17 @@ class Poller implements EventListener { p.poll(connectionRegistry.getConnectedContacts(t)); } + private class ScheduledPollTask { + + private final PollTask task; + private final Future future; + + private ScheduledPollTask(PollTask task, Future future) { + this.task = task; + this.future = future; + } + } + private class PollTask implements Runnable { private final Plugin plugin; @@ -188,7 +218,9 @@ class Poller implements EventListener { lock.lock(); try { TransportId t = plugin.getId(); - if (tasks.get(t) != this) return; // Replaced by another task + ScheduledPollTask scheduled = tasks.get(t); + if (scheduled != null && scheduled.task != this) + return; // Replaced by another task tasks.remove(t); } finally { lock.unlock(); diff --git a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java index 8d9a975a766f3e858f877cfc768c10a1615d95b1..c27c82a4ffef392a1d81b1bcd955a6431539b16e 100644 --- a/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java +++ b/bramble-core/src/test/java/org/briarproject/bramble/plugin/PollerTest.java @@ -12,14 +12,14 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin; import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection; import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent; import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent; +import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent; import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent; import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin; import org.briarproject.bramble.api.system.Clock; -import org.briarproject.bramble.test.BrambleTestCase; +import org.briarproject.bramble.test.BrambleMockTestCase; import org.briarproject.bramble.test.ImmediateExecutor; import org.briarproject.bramble.test.RunAction; import org.jmock.Expectations; -import org.jmock.Mockery; import org.jmock.lib.legacy.ClassImposteriser; import org.junit.Test; @@ -29,30 +29,37 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import static java.util.concurrent.TimeUnit.MILLISECONDS; -public class PollerTest extends BrambleTestCase { - +public class PollerTest extends BrambleMockTestCase { + + private final ScheduledExecutorService scheduler = + context.mock(ScheduledExecutorService.class); + private final ConnectionManager connectionManager = + context.mock(ConnectionManager.class); + private final ConnectionRegistry connectionRegistry = + context.mock(ConnectionRegistry.class); + private final PluginManager pluginManager = + context.mock(PluginManager.class); + private final Clock clock = context.mock(Clock.class); + private final ScheduledFuture future = context.mock(ScheduledFuture.class); + private final SecureRandom random; + + private final Executor ioExecutor = new ImmediateExecutor(); + private final TransportId transportId = new TransportId("id"); private final ContactId contactId = new ContactId(234); private final int pollingInterval = 60 * 1000; private final long now = System.currentTimeMillis(); - @Test - public void testConnectOnContactStatusChanged() throws Exception { - Mockery context = new Mockery(); + public PollerTest() { context.setImposteriser(ClassImposteriser.INSTANCE); - Executor ioExecutor = new ImmediateExecutor(); - ScheduledExecutorService scheduler = - context.mock(ScheduledExecutorService.class); - ConnectionManager connectionManager = - context.mock(ConnectionManager.class); - ConnectionRegistry connectionRegistry = - context.mock(ConnectionRegistry.class); - PluginManager pluginManager = context.mock(PluginManager.class); - SecureRandom random = context.mock(SecureRandom.class); - Clock clock = context.mock(Clock.class); + random = context.mock(SecureRandom.class); + } + @Test + public void testConnectOnContactStatusChanged() throws Exception { // Two simplex plugins: one supports polling, the other doesn't SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class); SimplexPlugin simplexPlugin1 = @@ -120,28 +127,12 @@ public class PollerTest extends BrambleTestCase { connectionRegistry, pluginManager, random, clock); p.eventOccurred(new ContactStatusChangedEvent(contactId, true)); - - context.assertIsSatisfied(); } @Test public void testRescheduleAndReconnectOnConnectionClosed() throws Exception { - Mockery context = new Mockery(); - context.setImposteriser(ClassImposteriser.INSTANCE); - Executor ioExecutor = new ImmediateExecutor(); - ScheduledExecutorService scheduler = - context.mock(ScheduledExecutorService.class); - ConnectionManager connectionManager = - context.mock(ConnectionManager.class); - ConnectionRegistry connectionRegistry = - context.mock(ConnectionRegistry.class); - PluginManager pluginManager = context.mock(PluginManager.class); - SecureRandom random = context.mock(SecureRandom.class); - Clock clock = context.mock(Clock.class); - DuplexPlugin plugin = context.mock(DuplexPlugin.class); - TransportId transportId = new TransportId("id"); DuplexTransportConnection duplexConnection = context.mock(DuplexTransportConnection.class); @@ -168,6 +159,7 @@ public class PollerTest extends BrambleTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with((long) pollingInterval), with(MILLISECONDS)); + will(returnValue(future)); // connectToContact() // Check whether the contact is already connected oneOf(connectionRegistry).isConnected(contactId, transportId); @@ -185,28 +177,12 @@ public class PollerTest extends BrambleTestCase { p.eventOccurred(new ConnectionClosedEvent(contactId, transportId, false)); - - context.assertIsSatisfied(); } @Test public void testRescheduleOnConnectionOpened() throws Exception { - Mockery context = new Mockery(); - context.setImposteriser(ClassImposteriser.INSTANCE); - Executor ioExecutor = new ImmediateExecutor(); - ScheduledExecutorService scheduler = - context.mock(ScheduledExecutorService.class); - ConnectionManager connectionManager = - context.mock(ConnectionManager.class); - ConnectionRegistry connectionRegistry = - context.mock(ConnectionRegistry.class); - PluginManager pluginManager = context.mock(PluginManager.class); - SecureRandom random = context.mock(SecureRandom.class); - Clock clock = context.mock(Clock.class); - - DuplexPlugin plugin = context.mock(DuplexPlugin.class); - TransportId transportId = new TransportId("id"); + Plugin plugin = context.mock(Plugin.class); context.checking(new Expectations() {{ allowing(plugin).getId(); @@ -224,6 +200,7 @@ public class PollerTest extends BrambleTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with((long) pollingInterval), with(MILLISECONDS)); + will(returnValue(future)); }}); Poller p = new Poller(ioExecutor, scheduler, connectionManager, @@ -231,27 +208,11 @@ public class PollerTest extends BrambleTestCase { p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); - - context.assertIsSatisfied(); } @Test public void testRescheduleDoesNotReplaceEarlierTask() throws Exception { - Mockery context = new Mockery(); - context.setImposteriser(ClassImposteriser.INSTANCE); - Executor ioExecutor = new ImmediateExecutor(); - ScheduledExecutorService scheduler = - context.mock(ScheduledExecutorService.class); - ConnectionManager connectionManager = - context.mock(ConnectionManager.class); - ConnectionRegistry connectionRegistry = - context.mock(ConnectionRegistry.class); - PluginManager pluginManager = context.mock(PluginManager.class); - SecureRandom random = context.mock(SecureRandom.class); - Clock clock = context.mock(Clock.class); - - DuplexPlugin plugin = context.mock(DuplexPlugin.class); - TransportId transportId = new TransportId("id"); + Plugin plugin = context.mock(Plugin.class); context.checking(new Expectations() {{ allowing(plugin).getId(); @@ -270,6 +231,7 @@ public class PollerTest extends BrambleTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with((long) pollingInterval), with(MILLISECONDS)); + will(returnValue(future)); // Second event // Get the plugin oneOf(pluginManager).getPlugin(transportId); @@ -291,27 +253,59 @@ public class PollerTest extends BrambleTestCase { false)); p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, false)); - - context.assertIsSatisfied(); } @Test - public void testPollOnTransportEnabled() throws Exception { - Mockery context = new Mockery(); - context.setImposteriser(ClassImposteriser.INSTANCE); - Executor ioExecutor = new ImmediateExecutor(); - ScheduledExecutorService scheduler = - context.mock(ScheduledExecutorService.class); - ConnectionManager connectionManager = - context.mock(ConnectionManager.class); - ConnectionRegistry connectionRegistry = - context.mock(ConnectionRegistry.class); - PluginManager pluginManager = context.mock(PluginManager.class); - SecureRandom random = context.mock(SecureRandom.class); - Clock clock = context.mock(Clock.class); + public void testRescheduleReplacesLaterTask() throws Exception { + Plugin plugin = context.mock(Plugin.class); + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + // First event + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Schedule the next poll + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(scheduler).schedule(with(any(Runnable.class)), + with((long) pollingInterval), with(MILLISECONDS)); + will(returnValue(future)); + // Second event + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Replace the previously scheduled task, due later + oneOf(plugin).getPollingInterval(); + will(returnValue(pollingInterval - 2)); + oneOf(clock).currentTimeMillis(); + will(returnValue(now + 1)); + oneOf(future).cancel(false); + oneOf(scheduler).schedule(with(any(Runnable.class)), + with((long) pollingInterval - 2), with(MILLISECONDS)); + }}); + + Poller p = new Poller(ioExecutor, scheduler, connectionManager, + connectionRegistry, pluginManager, random, clock); + + p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + false)); + p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, + false)); + } + + @Test + public void testPollsOnTransportEnabled() throws Exception { Plugin plugin = context.mock(Plugin.class); - TransportId transportId = new TransportId("id"); List<ContactId> connected = Collections.singletonList(contactId); context.checking(new Expectations() {{ @@ -328,6 +322,7 @@ public class PollerTest extends BrambleTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L), with(MILLISECONDS)); + will(returnValue(future)); will(new RunAction()); // Running the polling task schedules the next polling task oneOf(plugin).getPollingInterval(); @@ -338,6 +333,7 @@ public class PollerTest extends BrambleTestCase { will(returnValue(now)); oneOf(scheduler).schedule(with(any(Runnable.class)), with((long) (pollingInterval * 0.5)), with(MILLISECONDS)); + will(returnValue(future)); // Poll the plugin oneOf(connectionRegistry).getConnectedContacts(transportId); will(returnValue(connected)); @@ -348,7 +344,36 @@ public class PollerTest extends BrambleTestCase { connectionRegistry, pluginManager, random, clock); p.eventOccurred(new TransportEnabledEvent(transportId)); + } - context.assertIsSatisfied(); + @Test + public void testCancelsPollingOnTransportDisabled() throws Exception { + Plugin plugin = context.mock(Plugin.class); + List<ContactId> connected = Collections.singletonList(contactId); + + context.checking(new Expectations() {{ + allowing(plugin).getId(); + will(returnValue(transportId)); + // Get the plugin + oneOf(pluginManager).getPlugin(transportId); + will(returnValue(plugin)); + // The plugin supports polling + oneOf(plugin).shouldPoll(); + will(returnValue(true)); + // Schedule a polling task immediately + oneOf(clock).currentTimeMillis(); + will(returnValue(now)); + oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L), + with(MILLISECONDS)); + will(returnValue(future)); + // The plugin is disabled before the task runs - cancel the task + oneOf(future).cancel(false); + }}); + + Poller p = new Poller(ioExecutor, scheduler, connectionManager, + connectionRegistry, pluginManager, random, clock); + + p.eventOccurred(new TransportEnabledEvent(transportId)); + p.eventOccurred(new TransportDisabledEvent(transportId)); } }