Skip to content
Snippets Groups Projects
Unverified Commit e3bf20ae authored by akwizgran's avatar akwizgran
Browse files

Reschedule polling when connections are opened or closed.

parent 1bdd7ca7
No related branches found
No related tags found
No related merge requests found
......@@ -7,6 +7,7 @@ import org.briarproject.api.plugins.BackoffFactory;
import org.briarproject.api.plugins.ConnectionManager;
import org.briarproject.api.plugins.ConnectionRegistry;
import org.briarproject.api.plugins.PluginManager;
import org.briarproject.api.system.Clock;
import java.security.SecureRandom;
import java.util.concurrent.Executor;
......@@ -39,9 +40,9 @@ public class PluginsModule {
ScheduledExecutorService scheduler,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
SecureRandom random, EventBus eventBus) {
SecureRandom random, Clock clock, EventBus eventBus) {
Poller poller = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
connectionRegistry, pluginManager, random, clock);
eventBus.addListener(poller);
return poller;
}
......
......@@ -3,6 +3,7 @@ package org.briarproject.plugins;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ConnectionOpenedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.Event;
import org.briarproject.api.event.EventListener;
......@@ -16,12 +17,15 @@ import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.system.Clock;
import java.security.SecureRandom;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.inject.Inject;
......@@ -39,20 +43,24 @@ class Poller implements EventListener {
private final ConnectionRegistry connectionRegistry;
private final PluginManager pluginManager;
private final SecureRandom random;
private final Map<TransportId, PollTask> tasks;
private final Clock clock;
private final Lock lock;
private final Map<TransportId, PollTask> tasks; // Locking: lock
@Inject
Poller(@IoExecutor Executor ioExecutor, ScheduledExecutorService scheduler,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
SecureRandom random) {
SecureRandom random, Clock clock) {
this.ioExecutor = ioExecutor;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.pluginManager = pluginManager;
this.random = random;
this.scheduler = scheduler;
tasks = new ConcurrentHashMap<TransportId, PollTask>();
this.clock = clock;
lock = new ReentrantLock();
tasks = new HashMap<TransportId, PollTask>();
}
@Override
......@@ -65,12 +73,19 @@ class Poller implements EventListener {
}
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
} else if (e instanceof ConnectionOpenedEvent) {
ConnectionOpenedEvent c = (ConnectionOpenedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
} else if (e instanceof TransportEnabledEvent) {
TransportEnabledEvent t = (TransportEnabledEvent) e;
// Poll the newly enabled transport
pollNow(t.getTransportId());
}
}
......@@ -118,18 +133,32 @@ class Poller implements EventListener {
});
}
private void reschedule(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p.shouldPoll()) schedule(p, p.getPollingInterval(), false);
}
private void pollNow(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
// Randomise next polling interval
if (p.shouldPoll()) schedule(p, 0, true);
}
private void schedule(Plugin p, int interval, boolean randomiseNext) {
// Replace any previously scheduled task for this plugin
PollTask task = new PollTask(p, randomiseNext);
PollTask replaced = tasks.put(p.getId(), task);
if (replaced != null) replaced.cancel();
scheduler.schedule(task, interval, MILLISECONDS);
private void schedule(Plugin p, int delay, boolean randomiseNext) {
// Replace any later scheduled task for this plugin
long due = clock.currentTimeMillis() + delay;
lock.lock();
try {
TransportId t = p.getId();
PollTask scheduled = tasks.get(t);
if (scheduled == null || due < scheduled.due) {
PollTask task = new PollTask(p, due, randomiseNext);
tasks.put(t, task);
scheduler.schedule(task, delay, MILLISECONDS);
}
} finally {
lock.unlock();
}
}
private void poll(final Plugin p) {
......@@ -146,27 +175,28 @@ class Poller implements EventListener {
private class PollTask implements Runnable {
private final Plugin plugin;
private final long due;
private final boolean randomiseNext;
private volatile boolean cancelled = false;
private PollTask(Plugin plugin, boolean randomiseNext) {
private PollTask(Plugin plugin, long due, boolean randomiseNext) {
this.plugin = plugin;
this.due = due;
this.randomiseNext = randomiseNext;
}
private void cancel() {
cancelled = true;
}
@Override
public void run() {
if (cancelled) return;
tasks.remove(plugin.getId());
int interval = plugin.getPollingInterval();
if (randomiseNext)
interval = (int) (interval * random.nextDouble());
schedule(plugin, interval, false);
lock.lock();
try {
TransportId t = plugin.getId();
if (tasks.get(t) != this) return; // Replaced by another task
tasks.remove(t);
} finally {
lock.unlock();
}
int delay = plugin.getPollingInterval();
if (randomiseNext) delay = (int) (delay * random.nextDouble());
schedule(plugin, delay, false);
poll(plugin);
}
}
......
......@@ -6,6 +6,7 @@ import org.briarproject.RunAction;
import org.briarproject.api.TransportId;
import org.briarproject.api.contact.ContactId;
import org.briarproject.api.event.ConnectionClosedEvent;
import org.briarproject.api.event.ConnectionOpenedEvent;
import org.briarproject.api.event.ContactStatusChangedEvent;
import org.briarproject.api.event.TransportEnabledEvent;
import org.briarproject.api.plugins.ConnectionManager;
......@@ -16,6 +17,7 @@ import org.briarproject.api.plugins.TransportConnectionWriter;
import org.briarproject.api.plugins.duplex.DuplexPlugin;
import org.briarproject.api.plugins.duplex.DuplexTransportConnection;
import org.briarproject.api.plugins.simplex.SimplexPlugin;
import org.briarproject.api.system.Clock;
import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.lib.legacy.ClassImposteriser;
......@@ -33,9 +35,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class PollerTest extends BriarTestCase {
private final ContactId contactId = new ContactId(234);
private final int pollingInterval = 60 * 1000;
private final long now = System.currentTimeMillis();
@Test
public void testConnectToNewContact() throws Exception {
public void testConnectOnContactStatusChanged() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
......@@ -47,6 +51,7 @@ public class PollerTest extends BriarTestCase {
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final Clock clock = context.mock(Clock.class);
// Two simplex plugins: one supports polling, the other doesn't
final SimplexPlugin simplexPlugin = context.mock(SimplexPlugin.class);
......@@ -112,7 +117,7 @@ public class PollerTest extends BriarTestCase {
}});
Poller p = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
connectionRegistry, pluginManager, random, clock);
p.eventOccurred(new ContactStatusChangedEvent(contactId, true));
......@@ -120,7 +125,8 @@ public class PollerTest extends BriarTestCase {
}
@Test
public void testReconnectToDisconnectedContact() throws Exception {
public void testRescheduleAndReconnectOnConnectionClosed()
throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
......@@ -132,6 +138,7 @@ public class PollerTest extends BriarTestCase {
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final Clock clock = context.mock(Clock.class);
final DuplexPlugin plugin = context.mock(DuplexPlugin.class);
final TransportId transportId = new TransportId("id");
......@@ -139,15 +146,30 @@ public class PollerTest extends BriarTestCase {
context.mock(DuplexTransportConnection.class);
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
// reschedule()
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule the next poll
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
// connectToContact()
// Check whether the contact is already connected
oneOf(plugin).getId();
will(returnValue(transportId));
oneOf(connectionRegistry).isConnected(contactId, transportId);
will(returnValue(false));
// Connect to the contact
......@@ -159,7 +181,7 @@ public class PollerTest extends BriarTestCase {
}});
Poller p = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
connectionRegistry, pluginManager, random, clock);
p.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
false));
......@@ -167,8 +189,114 @@ public class PollerTest extends BriarTestCase {
context.assertIsSatisfied();
}
@Test
public void testRescheduleOnConnectionOpened() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final Clock clock = context.mock(Clock.class);
final DuplexPlugin plugin = context.mock(DuplexPlugin.class);
final TransportId transportId = new TransportId("id");
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule the next poll
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
}});
Poller p = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random, clock);
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false));
context.assertIsSatisfied();
}
@Test
public void testRescheduleDoesNotReplaceEarlierTask() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
final ConnectionManager connectionManager =
context.mock(ConnectionManager.class);
final ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final Clock clock = context.mock(Clock.class);
final DuplexPlugin plugin = context.mock(DuplexPlugin.class);
final TransportId transportId = new TransportId("id");
context.checking(new Expectations() {{
allowing(plugin).getId();
will(returnValue(transportId));
// First event
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule the next poll
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) pollingInterval), with(MILLISECONDS));
// Second event
// Get the plugin
oneOf(pluginManager).getPlugin(transportId);
will(returnValue(plugin));
// The plugin supports polling
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Don't replace the previously scheduled task, due earlier
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(clock).currentTimeMillis();
will(returnValue(now + 1));
}});
Poller p = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random, clock);
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false));
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false));
context.assertIsSatisfied();
}
@Test
public void testPollWhenTransportIsEnabled() throws Exception {
public void testPollOnTransportEnabled() throws Exception {
Mockery context = new Mockery();
context.setImposteriser(ClassImposteriser.INSTANCE);
final Executor ioExecutor = new ImmediateExecutor();
......@@ -180,10 +308,10 @@ public class PollerTest extends BriarTestCase {
context.mock(ConnectionRegistry.class);
final PluginManager pluginManager = context.mock(PluginManager.class);
final SecureRandom random = context.mock(SecureRandom.class);
final Clock clock = context.mock(Clock.class);
final Plugin plugin = context.mock(Plugin.class);
final TransportId transportId = new TransportId("id");
final int pollingInterval = 60 * 1000;
final List<ContactId> connected = Collections.singletonList(contactId);
context.checking(new Expectations() {{
......@@ -196,14 +324,18 @@ public class PollerTest extends BriarTestCase {
oneOf(plugin).shouldPoll();
will(returnValue(true));
// Schedule a polling task immediately
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)), with(0L),
with(MILLISECONDS));
will(new RunAction());
// Run the polling task
// Running the polling task schedules the next polling task
oneOf(plugin).getPollingInterval();
will(returnValue(pollingInterval));
oneOf(random).nextDouble();
will(returnValue(0.5));
oneOf(clock).currentTimeMillis();
will(returnValue(now));
oneOf(scheduler).schedule(with(any(Runnable.class)),
with((long) (pollingInterval * 0.5)), with(MILLISECONDS));
// Poll the plugin
......@@ -213,7 +345,7 @@ public class PollerTest extends BriarTestCase {
}});
Poller p = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, random);
connectionRegistry, pluginManager, random, clock);
p.eventOccurred(new TransportEnabledEvent(transportId));
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment