...
 
Commits (1)
...@@ -8,7 +8,6 @@ import org.briarproject.bramble.api.lifecycle.Service; ...@@ -8,7 +8,6 @@ import org.briarproject.bramble.api.lifecycle.Service;
import org.briarproject.bramble.api.lifecycle.ServiceException; import org.briarproject.bramble.api.lifecycle.ServiceException;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.Plugin; import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.PluginCallback; import org.briarproject.bramble.api.plugin.PluginCallback;
import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.PluginConfig;
...@@ -30,10 +29,7 @@ import org.briarproject.bramble.api.properties.TransportProperties; ...@@ -30,10 +29,7 @@ import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.settings.Settings; import org.briarproject.bramble.api.settings.Settings;
import org.briarproject.bramble.api.settings.SettingsManager; import org.briarproject.bramble.api.settings.SettingsManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.Scheduler;
import java.security.SecureRandom;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
...@@ -42,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap; ...@@ -42,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger; import java.util.logging.Logger;
...@@ -64,15 +59,11 @@ class PluginManagerImpl implements PluginManager, Service { ...@@ -64,15 +59,11 @@ class PluginManagerImpl implements PluginManager, Service {
Logger.getLogger(PluginManagerImpl.class.getName()); Logger.getLogger(PluginManagerImpl.class.getName());
private final Executor ioExecutor; private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final EventBus eventBus; private final EventBus eventBus;
private final PluginConfig pluginConfig; private final PluginConfig pluginConfig;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final SettingsManager settingsManager; private final SettingsManager settingsManager;
private final TransportPropertyManager transportPropertyManager; private final TransportPropertyManager transportPropertyManager;
private final SecureRandom random;
private final Clock clock;
private final Map<TransportId, Plugin> plugins; private final Map<TransportId, Plugin> plugins;
private final List<SimplexPlugin> simplexPlugins; private final List<SimplexPlugin> simplexPlugins;
private final List<DuplexPlugin> duplexPlugins; private final List<DuplexPlugin> duplexPlugins;
...@@ -80,41 +71,25 @@ class PluginManagerImpl implements PluginManager, Service { ...@@ -80,41 +71,25 @@ class PluginManagerImpl implements PluginManager, Service {
private final AtomicBoolean used = new AtomicBoolean(false); private final AtomicBoolean used = new AtomicBoolean(false);
@Inject @Inject
PluginManagerImpl(@IoExecutor Executor ioExecutor, PluginManagerImpl(@IoExecutor Executor ioExecutor, EventBus eventBus,
@Scheduler ScheduledExecutorService scheduler, EventBus eventBus,
PluginConfig pluginConfig, ConnectionManager connectionManager, PluginConfig pluginConfig, ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry,
SettingsManager settingsManager, SettingsManager settingsManager,
TransportPropertyManager transportPropertyManager, TransportPropertyManager transportPropertyManager) {
SecureRandom random, Clock clock) {
this.ioExecutor = ioExecutor; this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.eventBus = eventBus; this.eventBus = eventBus;
this.pluginConfig = pluginConfig; this.pluginConfig = pluginConfig;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.settingsManager = settingsManager; this.settingsManager = settingsManager;
this.transportPropertyManager = transportPropertyManager; this.transportPropertyManager = transportPropertyManager;
this.random = random;
this.clock = clock;
plugins = new ConcurrentHashMap<>(); plugins = new ConcurrentHashMap<>();
simplexPlugins = new CopyOnWriteArrayList<>(); simplexPlugins = new CopyOnWriteArrayList<>();
duplexPlugins = new CopyOnWriteArrayList<>(); duplexPlugins = new CopyOnWriteArrayList<>();
startLatches = new ConcurrentHashMap<>(); startLatches = new ConcurrentHashMap<>();
} }
@Override @Override
public void startService() { public void startService() {
if (used.getAndSet(true)) throw new IllegalStateException(); if (used.getAndSet(true)) throw new IllegalStateException();
// Instantiate the poller
if (pluginConfig.shouldPoll()) {
LOG.info("Starting poller");
Poller poller = new Poller(ioExecutor, scheduler, connectionManager,
connectionRegistry, this, transportPropertyManager, random,
clock);
eventBus.addListener(poller);
}
// Instantiate the simplex plugins and start them asynchronously // Instantiate the simplex plugins and start them asynchronously
LOG.info("Starting simplex plugins"); LOG.info("Starting simplex plugins");
for (SimplexPluginFactory f : pluginConfig.getSimplexFactories()) { for (SimplexPluginFactory f : pluginConfig.getSimplexFactories()) {
......
package org.briarproject.bramble.plugin; package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.lifecycle.LifecycleManager; import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.plugin.BackoffFactory; import org.briarproject.bramble.api.plugin.BackoffFactory;
import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.ConnectionRegistry; import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.PluginConfig;
import org.briarproject.bramble.api.plugin.PluginManager; import org.briarproject.bramble.api.plugin.PluginManager;
import javax.inject.Inject; import javax.inject.Inject;
...@@ -18,6 +20,8 @@ public class PluginModule { ...@@ -18,6 +20,8 @@ public class PluginModule {
public static class EagerSingletons { public static class EagerSingletons {
@Inject @Inject
PluginManager pluginManager; PluginManager pluginManager;
@Inject
Poller poller;
} }
@Provides @Provides
...@@ -46,4 +50,12 @@ public class PluginModule { ...@@ -46,4 +50,12 @@ public class PluginModule {
lifecycleManager.registerService(pluginManager); lifecycleManager.registerService(pluginManager);
return pluginManager; return pluginManager;
} }
@Provides
@Singleton
Poller providePoller(PluginConfig config, EventBus eventBus,
PollerImpl poller) {
if (config.shouldPoll()) eventBus.addListener(poller);
return poller;
}
} }
package org.briarproject.bramble.plugin; package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.contact.ContactId; /**
import org.briarproject.bramble.api.contact.event.ContactAddedEvent; * Empty interface used for injecting the poller.
import org.briarproject.bramble.api.db.DbException; */
import org.briarproject.bramble.api.event.Event; interface Poller {
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent;
import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent;
import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent;
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.Scheduler;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.concurrent.ThreadSafe;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static org.briarproject.bramble.util.LogUtils.logException;
@ThreadSafe
@NotNullByDefault
class Poller implements EventListener {
private static final Logger LOG = Logger.getLogger(Poller.class.getName());
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final PluginManager pluginManager;
private final TransportPropertyManager transportPropertyManager;
private final SecureRandom random;
private final Clock clock;
private final Lock lock;
private final Map<TransportId, ScheduledPollTask> tasks; // Locking: lock
Poller(@IoExecutor Executor ioExecutor,
@Scheduler ScheduledExecutorService scheduler,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
TransportPropertyManager transportPropertyManager,
SecureRandom random, Clock clock) {
this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.pluginManager = pluginManager;
this.transportPropertyManager = transportPropertyManager;
this.random = random;
this.clock = clock;
lock = new ReentrantLock();
tasks = new HashMap<>();
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactAddedEvent) {
ContactAddedEvent c = (ContactAddedEvent) e;
// Connect to the newly activated contact
connectToContact(c.getContactId());
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
} else if (e instanceof ConnectionOpenedEvent) {
ConnectionOpenedEvent c = (ConnectionOpenedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
} else if (e instanceof TransportEnabledEvent) {
TransportEnabledEvent t = (TransportEnabledEvent) e;
// Poll the newly enabled transport
pollNow(t.getTransportId());
} else if (e instanceof TransportDisabledEvent) {
TransportDisabledEvent t = (TransportDisabledEvent) e;
// Cancel polling for the disabled transport
cancel(t.getTransportId());
}
}
private void connectToContact(ContactId c) {
for (SimplexPlugin s : pluginManager.getSimplexPlugins())
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : pluginManager.getDuplexPlugins())
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(ContactId c, TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(ContactId c, SimplexPlugin p) {
ioExecutor.execute(() -> {
TransportId t = p.getId();
if (connectionRegistry.isConnected(c, t)) return;
try {
TransportProperties props =
transportPropertyManager.getRemoteProperties(c, t);
TransportConnectionWriter w = p.createWriter(props);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
});
}
private void connectToContact(ContactId c, DuplexPlugin p) {
ioExecutor.execute(() -> {
TransportId t = p.getId();
if (connectionRegistry.isConnected(c, t)) return;
try {
TransportProperties props =
transportPropertyManager.getRemoteProperties(c, t);
DuplexTransportConnection d = p.createConnection(props);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
});
}
private void reschedule(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p != null && p.shouldPoll())
schedule(p, p.getPollingInterval(), false);
}
private void pollNow(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
// Randomise next polling interval
if (p != null && p.shouldPoll()) schedule(p, 0, true);
}
private void schedule(Plugin p, int delay, boolean randomiseNext) {
// Replace any later scheduled task for this plugin
long due = clock.currentTimeMillis() + delay;
TransportId t = p.getId();
lock.lock();
try {
ScheduledPollTask scheduled = tasks.get(t);
if (scheduled == null || due < scheduled.task.due) {
// If a later task exists, cancel it. If it's already started
// it will abort safely when it finds it's been replaced
if (scheduled != null) scheduled.future.cancel(false);
PollTask task = new PollTask(p, due, randomiseNext);
Future future = scheduler.schedule(
() -> ioExecutor.execute(task), delay, MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, future));
}
} finally {
lock.unlock();
}
}
private void cancel(TransportId t) {
lock.lock();
try {
ScheduledPollTask scheduled = tasks.remove(t);
if (scheduled != null) scheduled.future.cancel(false);
} finally {
lock.unlock();
}
}
@IoExecutor
private void poll(Plugin p) {
TransportId t = p.getId();
if (LOG.isLoggable(INFO)) LOG.info("Polling plugin " + t);
try {
Map<ContactId, TransportProperties> remote =
transportPropertyManager.getRemoteProperties(t);
Collection<ContactId> connected =
connectionRegistry.getConnectedContacts(t);
remote = new HashMap<>(remote);
remote.keySet().removeAll(connected);
if (!remote.isEmpty()) p.poll(remote);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
}
private class ScheduledPollTask {
private final PollTask task;
private final Future future;
private ScheduledPollTask(PollTask task, Future future) {
this.task = task;
this.future = future;
}
}
private class PollTask implements Runnable {
private final Plugin plugin;
private final long due;
private final boolean randomiseNext;
private PollTask(Plugin plugin, long due, boolean randomiseNext) {
this.plugin = plugin;
this.due = due;
this.randomiseNext = randomiseNext;
}
@Override
@IoExecutor
public void run() {
lock.lock();
try {
TransportId t = plugin.getId();
ScheduledPollTask scheduled = tasks.get(t);
if (scheduled != null && scheduled.task != this)
return; // Replaced by another task
tasks.remove(t);
} finally {
lock.unlock();
}
int delay = plugin.getPollingInterval();
if (randomiseNext) delay = (int) (delay * random.nextDouble());
schedule(plugin, delay, false);
poll(plugin);
}
}
} }
package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.event.ContactAddedEvent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.plugin.event.ConnectionClosedEvent;
import org.briarproject.bramble.api.plugin.event.ConnectionOpenedEvent;
import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent;
import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent;
import org.briarproject.bramble.api.plugin.simplex.SimplexPlugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.Scheduler;
import java.security.SecureRandom;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.util.LogUtils.logException;
@ThreadSafe
@NotNullByDefault
class PollerImpl implements Poller, EventListener {
private static final Logger LOG = getLogger(PollerImpl.class.getName());
private final Executor ioExecutor;
private final ScheduledExecutorService scheduler;
private final ConnectionManager connectionManager;
private final ConnectionRegistry connectionRegistry;
private final PluginManager pluginManager;
private final TransportPropertyManager transportPropertyManager;
private final SecureRandom random;
private final Clock clock;
private final Lock lock;
@GuardedBy("lock")
private final Map<TransportId, ScheduledPollTask> tasks;
@Inject
PollerImpl(@IoExecutor Executor ioExecutor,
@Scheduler ScheduledExecutorService scheduler,
ConnectionManager connectionManager,
ConnectionRegistry connectionRegistry, PluginManager pluginManager,
TransportPropertyManager transportPropertyManager,
SecureRandom random, Clock clock) {
this.ioExecutor = ioExecutor;
this.scheduler = scheduler;
this.connectionManager = connectionManager;
this.connectionRegistry = connectionRegistry;
this.pluginManager = pluginManager;
this.transportPropertyManager = transportPropertyManager;
this.random = random;
this.clock = clock;
lock = new ReentrantLock();
tasks = new HashMap<>();
}
@Override
public void eventOccurred(Event e) {
if (e instanceof ContactAddedEvent) {
ContactAddedEvent c = (ContactAddedEvent) e;
// Connect to the newly added contact
connectToContact(c.getContactId());
} else if (e instanceof ConnectionClosedEvent) {
ConnectionClosedEvent c = (ConnectionClosedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
if (!c.isIncoming()) {
// Connect to the disconnected contact
connectToContact(c.getContactId(), c.getTransportId());
}
} else if (e instanceof ConnectionOpenedEvent) {
ConnectionOpenedEvent c = (ConnectionOpenedEvent) e;
// Reschedule polling, the polling interval may have decreased
reschedule(c.getTransportId());
} else if (e instanceof TransportEnabledEvent) {
TransportEnabledEvent t = (TransportEnabledEvent) e;
// Poll the newly enabled transport
pollNow(t.getTransportId());
} else if (e instanceof TransportDisabledEvent) {
TransportDisabledEvent t = (TransportDisabledEvent) e;
// Cancel polling for the disabled transport
cancel(t.getTransportId());
}
}
private void connectToContact(ContactId c) {
for (SimplexPlugin s : pluginManager.getSimplexPlugins())
if (s.shouldPoll()) connectToContact(c, s);
for (DuplexPlugin d : pluginManager.getDuplexPlugins())
if (d.shouldPoll()) connectToContact(c, d);
}
private void connectToContact(ContactId c, TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p instanceof SimplexPlugin && p.shouldPoll())
connectToContact(c, (SimplexPlugin) p);
else if (p instanceof DuplexPlugin && p.shouldPoll())
connectToContact(c, (DuplexPlugin) p);
}
private void connectToContact(ContactId c, SimplexPlugin p) {
ioExecutor.execute(() -> {
TransportId t = p.getId();
if (connectionRegistry.isConnected(c, t)) return;
try {
TransportProperties props =
transportPropertyManager.getRemoteProperties(c, t);
TransportConnectionWriter w = p.createWriter(props);
if (w != null)
connectionManager.manageOutgoingConnection(c, t, w);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
});
}
private void connectToContact(ContactId c, DuplexPlugin p) {
ioExecutor.execute(() -> {
TransportId t = p.getId();
if (connectionRegistry.isConnected(c, t)) return;
try {
TransportProperties props =
transportPropertyManager.getRemoteProperties(c, t);
DuplexTransportConnection d = p.createConnection(props);
if (d != null)
connectionManager.manageOutgoingConnection(c, t, d);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
});
}
private void reschedule(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p != null && p.shouldPoll())
schedule(p, p.getPollingInterval(), false);
}
private void pollNow(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
// Randomise next polling interval
if (p != null && p.shouldPoll()) schedule(p, 0, true);
}
private void schedule(Plugin p, int delay, boolean randomiseNext) {
// Replace any later scheduled task for this plugin
long due = clock.currentTimeMillis() + delay;
TransportId t = p.getId();
lock.lock();
try {
ScheduledPollTask scheduled = tasks.get(t);
if (scheduled == null || due < scheduled.task.due) {
// If a later task exists, cancel it. If it's already started
// it will abort safely when it finds it's been replaced
if (scheduled != null) scheduled.future.cancel(false);
PollTask task = new PollTask(p, due, randomiseNext);
Future future = scheduler.schedule(() ->
ioExecutor.execute(task), delay, MILLISECONDS);
tasks.put(t, new ScheduledPollTask(task, future));
}
} finally {
lock.unlock();
}
}
private void cancel(TransportId t) {
lock.lock();
try {
ScheduledPollTask scheduled = tasks.remove(t);
if (scheduled != null) scheduled.future.cancel(false);
} finally {
lock.unlock();
}
}
@IoExecutor
private void poll(Plugin p) {
TransportId t = p.getId();
if (LOG.isLoggable(INFO)) LOG.info("Polling plugin " + t);
try {
Map<ContactId, TransportProperties> remote =
transportPropertyManager.getRemoteProperties(t);
Collection<ContactId> connected =
connectionRegistry.getConnectedContacts(t);
remote = new HashMap<>(remote);
remote.keySet().removeAll(connected);
if (!remote.isEmpty()) p.poll(remote);
} catch (DbException e) {
logException(LOG, WARNING, e);
}
}
private class ScheduledPollTask {
private final PollTask task;
private final Future future;
private ScheduledPollTask(PollTask task, Future future) {
this.task = task;
this.future = future;
}
}
private class PollTask implements Runnable {
private final Plugin plugin;
private final long due;
private final boolean randomiseNext;
private PollTask(Plugin plugin, long due, boolean randomiseNext) {
this.plugin = plugin;
this.due = due;
this.randomiseNext = randomiseNext;
}
@Override
@IoExecutor
public void run() {
lock.lock();
try {
TransportId t = plugin.getId();
ScheduledPollTask scheduled = tasks.get(t);
if (scheduled != null && scheduled.task != this)
return; // Replaced by another task
tasks.remove(t);
} finally {
lock.unlock();
}
int delay = plugin.getPollingInterval();
if (randomiseNext) delay = (int) (delay * random.nextDouble());
schedule(plugin, delay, false);
poll(plugin);
}
}
}
...@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin; ...@@ -2,7 +2,6 @@ package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.event.EventBus; import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.plugin.ConnectionManager; import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.ConnectionRegistry;
import org.briarproject.bramble.api.plugin.PluginConfig; import org.briarproject.bramble.api.plugin.PluginConfig;
import org.briarproject.bramble.api.plugin.PluginException; import org.briarproject.bramble.api.plugin.PluginException;
import org.briarproject.bramble.api.plugin.TransportId; import org.briarproject.bramble.api.plugin.TransportId;
...@@ -14,18 +13,15 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPluginCallback; ...@@ -14,18 +13,15 @@ import org.briarproject.bramble.api.plugin.simplex.SimplexPluginCallback;
import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory; import org.briarproject.bramble.api.plugin.simplex.SimplexPluginFactory;
import org.briarproject.bramble.api.properties.TransportPropertyManager; import org.briarproject.bramble.api.properties.TransportPropertyManager;
import org.briarproject.bramble.api.settings.SettingsManager; import org.briarproject.bramble.api.settings.SettingsManager;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.test.BrambleTestCase; import org.briarproject.bramble.test.BrambleTestCase;
import org.jmock.Expectations; import org.jmock.Expectations;
import org.jmock.Mockery; import org.jmock.Mockery;
import org.jmock.lib.concurrent.Synchroniser; import org.jmock.lib.concurrent.Synchroniser;
import org.junit.Test; import org.junit.Test;
import java.security.SecureRandom;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static org.briarproject.bramble.test.TestUtils.getTransportId; import static org.briarproject.bramble.test.TestUtils.getTransportId;
...@@ -37,16 +33,10 @@ public class PluginManagerImplTest extends BrambleTestCase { ...@@ -37,16 +33,10 @@ public class PluginManagerImplTest extends BrambleTestCase {
setThreadingPolicy(new Synchroniser()); setThreadingPolicy(new Synchroniser());
}}; }};
Executor ioExecutor = Executors.newSingleThreadExecutor(); Executor ioExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class);
SecureRandom random = new SecureRandom();
Clock clock = context.mock(Clock.class);
EventBus eventBus = context.mock(EventBus.class); EventBus eventBus = context.mock(EventBus.class);
PluginConfig pluginConfig = context.mock(PluginConfig.class); PluginConfig pluginConfig = context.mock(PluginConfig.class);
ConnectionManager connectionManager = ConnectionManager connectionManager =
context.mock(ConnectionManager.class); context.mock(ConnectionManager.class);
ConnectionRegistry connectionRegistry =
context.mock(ConnectionRegistry.class);
SettingsManager settingsManager = SettingsManager settingsManager =
context.mock(SettingsManager.class); context.mock(SettingsManager.class);
TransportPropertyManager transportPropertyManager = TransportPropertyManager transportPropertyManager =
...@@ -122,9 +112,9 @@ public class PluginManagerImplTest extends BrambleTestCase { ...@@ -122,9 +112,9 @@ public class PluginManagerImplTest extends BrambleTestCase {
oneOf(duplexPlugin).stop(); oneOf(duplexPlugin).stop();
}}); }});
PluginManagerImpl p = new PluginManagerImpl(ioExecutor, scheduler, PluginManagerImpl p = new PluginManagerImpl(ioExecutor, eventBus,
eventBus, pluginConfig, connectionManager, connectionRegistry, pluginConfig, connectionManager, settingsManager,
settingsManager, transportPropertyManager, random, clock); transportPropertyManager);
// Two plugins should be started and stopped // Two plugins should be started and stopped
p.startService(); p.startService();
......
...@@ -23,6 +23,7 @@ import org.briarproject.bramble.test.ImmediateExecutor; ...@@ -23,6 +23,7 @@ import org.briarproject.bramble.test.ImmediateExecutor;
import org.briarproject.bramble.test.RunAction; import org.briarproject.bramble.test.RunAction;
import org.jmock.Expectations; import org.jmock.Expectations;
import org.jmock.lib.legacy.ClassImposteriser; import org.jmock.lib.legacy.ClassImposteriser;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.security.SecureRandom; import java.security.SecureRandom;
...@@ -39,7 +40,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; ...@@ -39,7 +40,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.briarproject.bramble.test.TestUtils.getContactId; import static org.briarproject.bramble.test.TestUtils.getContactId;
import static org.briarproject.bramble.test.TestUtils.getTransportId; import static org.briarproject.bramble.test.TestUtils.getTransportId;
public class PollerTest extends BrambleMockTestCase { public class PollerImplTest extends BrambleMockTestCase {
private final ScheduledExecutorService scheduler = private final ScheduledExecutorService scheduler =
context.mock(ScheduledExecutorService.class); context.mock(ScheduledExecutorService.class);
...@@ -62,11 +63,20 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -62,11 +63,20 @@ public class PollerTest extends BrambleMockTestCase {
private final int pollingInterval = 60 * 1000; private final int pollingInterval = 60 * 1000;
private final long now = System.currentTimeMillis(); private final long now = System.currentTimeMillis();
public PollerTest() { private PollerImpl poller;
public PollerImplTest() {
context.setImposteriser(ClassImposteriser.INSTANCE); context.setImposteriser(ClassImposteriser.INSTANCE);
random = context.mock(SecureRandom.class); random = context.mock(SecureRandom.class);
} }
@Before
public void setUp() {
poller = new PollerImpl(ioExecutor, scheduler, connectionManager,
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
}
@Test @Test
public void testConnectOnContactAdded() throws Exception { public void testConnectOnContactAdded() throws Exception {
// Two simplex plugins: one supports polling, the other doesn't // Two simplex plugins: one supports polling, the other doesn't
...@@ -140,11 +150,7 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -140,11 +150,7 @@ public class PollerTest extends BrambleMockTestCase {
will(returnValue(false)); will(returnValue(false));
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new ContactAddedEvent(contactId));
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new ContactAddedEvent(contactId));
} }
@Test @Test
...@@ -194,11 +200,7 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -194,11 +200,7 @@ public class PollerTest extends BrambleMockTestCase {
transportId, duplexConnection); transportId, duplexConnection);
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new ConnectionClosedEvent(contactId, transportId,
false)); false));
} }
...@@ -225,11 +227,7 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -225,11 +227,7 @@ public class PollerTest extends BrambleMockTestCase {
will(returnValue(future)); will(returnValue(future));
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false)); false));
} }
...@@ -269,13 +267,9 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -269,13 +267,9 @@ public class PollerTest extends BrambleMockTestCase {
will(returnValue(now + 1)); will(returnValue(now + 1));
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false)); false));
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false)); false));
} }
...@@ -318,13 +312,9 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -318,13 +312,9 @@ public class PollerTest extends BrambleMockTestCase {
with((long) pollingInterval - 2), with(MILLISECONDS)); with((long) pollingInterval - 2), with(MILLISECONDS));
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false)); false));
p.eventOccurred(new ConnectionOpenedEvent(contactId, transportId, poller.eventOccurred(new ConnectionOpenedEvent(contactId, transportId,
false)); false));
} }
...@@ -367,11 +357,7 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -367,11 +357,7 @@ public class PollerTest extends BrambleMockTestCase {
oneOf(plugin).poll(singletonMap(contactId, properties)); oneOf(plugin).poll(singletonMap(contactId, properties));
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new TransportEnabledEvent(transportId));
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new TransportEnabledEvent(transportId));
} }
@Test @Test
...@@ -412,11 +398,7 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -412,11 +398,7 @@ public class PollerTest extends BrambleMockTestCase {
// All contacts are connected, so don't poll the plugin // All contacts are connected, so don't poll the plugin
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new TransportEnabledEvent(transportId));
connectionRegistry, pluginManager, transportPropertyManager,
random, clock);
p.eventOccurred(new TransportEnabledEvent(transportId));
} }
@Test @Test
...@@ -442,11 +424,7 @@ public class PollerTest extends BrambleMockTestCase { ...@@ -442,11 +424,7 @@ public class PollerTest extends BrambleMockTestCase {
oneOf(future).cancel(false); oneOf(future).cancel(false);
}}); }});
Poller p = new Poller(ioExecutor, scheduler, connectionManager, poller.eventOccurred(new TransportEnabledEvent(transportId));
connectionRegistry, pluginManager, transportPropertyManager, poller.eventOccurred(new TransportDisabledEvent(transportId));
random, clock);
p.eventOccurred(new TransportEnabledEvent(transportId));
p.eventOccurred(new TransportDisabledEvent(transportId));
} }
} }