Commit 1f921753 authored by Torsten Grote's avatar Torsten Grote

Merge branch '1567-rendezvous-poller' into 'master'

Create poller for rendezvous connections

Closes #1567

See merge request !1121
parents 157b64e6 7439e557
Pipeline #3459 canceled with stage
......@@ -22,4 +22,11 @@ public class NullSafety {
@Nullable Object b) {
if ((a == null) == (b == null)) throw new AssertionError();
}
/**
* Checks that the argument is null.
*/
public static void requireNull(@Nullable Object o) {
if (o != null) throw new AssertionError();
}
}
......@@ -3,10 +3,11 @@ package org.briarproject.bramble.api.plugin.duplex;
import org.briarproject.bramble.api.data.BdfList;
import org.briarproject.bramble.api.keyagreement.KeyAgreementListener;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionHandler;
import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousHandler;
import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint;
import javax.annotation.Nullable;
......@@ -49,8 +50,12 @@ public interface DuplexPlugin extends Plugin {
boolean supportsRendezvous();
/**
* Creates and returns a handler that uses the given key material to
* rendezvous with a pending contact.
* Creates and returns an endpoint that uses the given key material to
* rendezvous with a pending contact, and the given connection handler to
* handle incoming connections. Returns null if an endpoint cannot be
* created.
*/
RendezvousHandler createRendezvousHandler(KeyMaterialSource k);
@Nullable
RendezvousEndpoint createRendezvousEndpoint(KeyMaterialSource k,
boolean alice, ConnectionHandler incoming);
}
package org.briarproject.bramble.api.rendezvous;
public interface RendezvousConstants {
/**
* Label for deriving key material from the master key.
*/
String KEY_MATERIAL_LABEL =
"org.briarproject.bramble.rendezvous/KEY_MATERIAL";
}
......@@ -2,13 +2,14 @@ package org.briarproject.bramble.api.rendezvous;
import org.briarproject.bramble.api.properties.TransportProperties;
import java.io.Closeable;
import java.io.IOException;
/**
* An interface for making and accepting rendezvous connections with a pending
* contact over a given transport.
*/
public interface RendezvousHandler {
public interface RendezvousEndpoint extends Closeable {
/**
* Returns a set of transport properties for connecting to the pending
......@@ -20,5 +21,6 @@ public interface RendezvousHandler {
* Closes the handler and releases any resources held by it, such as
* network sockets.
*/
@Override
void close() throws IOException;
}
package org.briarproject.bramble.api.rendezvous.event;
import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import javax.annotation.concurrent.Immutable;
/**
* An event that is broadcast when a rendezvous with a pending contact fails.
*/
@Immutable
@NotNullByDefault
public class RendezvousFailedEvent extends Event {
private final PendingContactId pendingContactId;
public RendezvousFailedEvent(PendingContactId pendingContactId) {
this.pendingContactId = pendingContactId;
}
public PendingContactId getPendingContactId() {
return pendingContactId;
}
}
......@@ -7,6 +7,7 @@ import org.briarproject.bramble.identity.IdentityModule;
import org.briarproject.bramble.lifecycle.LifecycleModule;
import org.briarproject.bramble.plugin.PluginModule;
import org.briarproject.bramble.properties.PropertiesModule;
import org.briarproject.bramble.rendezvous.RendezvousModule;
import org.briarproject.bramble.sync.validation.ValidationModule;
import org.briarproject.bramble.system.SystemModule;
import org.briarproject.bramble.transport.TransportModule;
......@@ -28,6 +29,8 @@ public interface BrambleCoreEagerSingletons {
void inject(PropertiesModule.EagerSingletons init);
void inject(RendezvousModule.EagerSingletons init);
void inject(SystemModule.EagerSingletons init);
void inject(TransportModule.EagerSingletons init);
......@@ -42,6 +45,7 @@ public interface BrambleCoreEagerSingletons {
inject(new DatabaseExecutorModule.EagerSingletons());
inject(new IdentityModule.EagerSingletons());
inject(new LifecycleModule.EagerSingletons());
inject(new RendezvousModule.EagerSingletons());
inject(new PluginModule.EagerSingletons());
inject(new PropertiesModule.EagerSingletons());
inject(new SystemModule.EagerSingletons());
......
......@@ -23,7 +23,7 @@ import org.briarproject.bramble.api.plugin.event.DisableBluetoothEvent;
import org.briarproject.bramble.api.plugin.event.EnableBluetoothEvent;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousHandler;
import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint;
import org.briarproject.bramble.api.settings.Settings;
import org.briarproject.bramble.api.settings.event.SettingsUpdatedEvent;
......@@ -398,7 +398,8 @@ abstract class BluetoothPlugin<SS> implements DuplexPlugin, EventListener {
}
@Override
public RendezvousHandler createRendezvousHandler(KeyMaterialSource k) {
public RendezvousEndpoint createRendezvousEndpoint(KeyMaterialSource k,
boolean alice, ConnectionHandler incoming) {
throw new UnsupportedOperationException();
}
......
......@@ -13,7 +13,7 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousHandler;
import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint;
import org.briarproject.bramble.util.IoUtils;
import java.io.IOException;
......@@ -309,7 +309,8 @@ abstract class TcpPlugin implements DuplexPlugin {
}
@Override
public RendezvousHandler createRendezvousHandler(KeyMaterialSource k) {
public RendezvousEndpoint createRendezvousEndpoint(KeyMaterialSource k,
boolean alice, ConnectionHandler incoming) {
throw new UnsupportedOperationException();
}
......
......@@ -26,7 +26,7 @@ import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousHandler;
import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint;
import org.briarproject.bramble.api.settings.Settings;
import org.briarproject.bramble.api.settings.event.SettingsUpdatedEvent;
import org.briarproject.bramble.api.system.Clock;
......@@ -613,7 +613,8 @@ abstract class TorPlugin implements DuplexPlugin, EventHandler, EventListener {
}
@Override
public RendezvousHandler createRendezvousHandler(KeyMaterialSource k) {
public RendezvousEndpoint createRendezvousEndpoint(KeyMaterialSource k,
boolean alice, ConnectionHandler incoming) {
throw new UnsupportedOperationException();
}
......
package org.briarproject.bramble.rendezvous;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.MINUTES;
interface RendezvousConstants {
/**
* The current version of the rendezvous protocol.
*/
byte PROTOCOL_VERSION = 0;
/**
* How long to try to rendezvous with a pending contact before giving up.
*/
long RENDEZVOUS_TIMEOUT_MS = DAYS.toMillis(2);
/**
* How often to try to rendezvous with pending contacts.
*/
long POLLING_INTERVAL_MS = MINUTES.toMillis(1);
/**
* Label for deriving the rendezvous key from the static master key.
*/
String RENDEZVOUS_KEY_LABEL =
"org.briarproject.bramble.rendezvous/RENDEZVOUS_KEY";
/**
* Label for deriving key material from the rendezvous key.
*/
String KEY_MATERIAL_LABEL =
"org.briarproject.bramble.rendezvous/KEY_MATERIAL";
}
package org.briarproject.bramble.api.rendezvous;
package org.briarproject.bramble.rendezvous;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
@NotNullByDefault
public interface RendezvousCrypto {
interface RendezvousCrypto {
KeyMaterialSource createKeyMaterialSource(SecretKey masterKey,
SecretKey deriveRendezvousKey(SecretKey staticMasterKey);
KeyMaterialSource createKeyMaterialSource(SecretKey rendezvousKey,
TransportId t);
}
......@@ -5,12 +5,13 @@ import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousCrypto;
import javax.annotation.concurrent.Immutable;
import javax.inject.Inject;
import static org.briarproject.bramble.api.rendezvous.RendezvousConstants.KEY_MATERIAL_LABEL;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.KEY_MATERIAL_LABEL;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.PROTOCOL_VERSION;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.RENDEZVOUS_KEY_LABEL;
import static org.briarproject.bramble.util.StringUtils.toUtf8;
@Immutable
......@@ -25,10 +26,16 @@ class RendezvousCryptoImpl implements RendezvousCrypto {
}
@Override
public KeyMaterialSource createKeyMaterialSource(SecretKey masterKey,
public SecretKey deriveRendezvousKey(SecretKey staticMasterKey) {
return crypto.deriveKey(RENDEZVOUS_KEY_LABEL, staticMasterKey,
new byte[] {PROTOCOL_VERSION});
}
@Override
public KeyMaterialSource createKeyMaterialSource(SecretKey rendezvousKey,
TransportId t) {
SecretKey sourceKey = crypto.deriveKey(KEY_MATERIAL_LABEL, masterKey,
toUtf8(t.getString()));
SecretKey sourceKey = crypto.deriveKey(KEY_MATERIAL_LABEL,
rendezvousKey, toUtf8(t.getString()));
return new KeyMaterialSourceImpl(sourceKey);
}
}
package org.briarproject.bramble.rendezvous;
import org.briarproject.bramble.api.rendezvous.RendezvousCrypto;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import javax.inject.Inject;
import javax.inject.Singleton;
import dagger.Module;
import dagger.Provides;
......@@ -8,9 +12,23 @@ import dagger.Provides;
@Module
public class RendezvousModule {
public static class EagerSingletons {
@Inject
RendezvousPoller rendezvousPoller;
}
@Provides
RendezvousCrypto provideRendezvousCrypto(
RendezvousCryptoImpl rendezvousCrypto) {
return rendezvousCrypto;
}
@Provides
@Singleton
RendezvousPoller provideRendezvousPoller(LifecycleManager lifecycleManager,
EventBus eventBus, RendezvousPollerImpl rendezvousPoller) {
lifecycleManager.registerService(rendezvousPoller);
eventBus.addListener(rendezvousPoller);
return rendezvousPoller;
}
}
package org.briarproject.bramble.rendezvous;
/**
* Empty interface for injecting the rendezvous poller.
*/
interface RendezvousPoller {
}
package org.briarproject.bramble.rendezvous;
import org.briarproject.bramble.PoliteExecutor;
import org.briarproject.bramble.api.Pair;
import org.briarproject.bramble.api.contact.PendingContact;
import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.contact.event.PendingContactAddedEvent;
import org.briarproject.bramble.api.contact.event.PendingContactRemovedEvent;
import org.briarproject.bramble.api.crypto.KeyPair;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.crypto.TransportCrypto;
import org.briarproject.bramble.api.db.DatabaseComponent;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.event.Event;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.event.EventExecutor;
import org.briarproject.bramble.api.event.EventListener;
import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.Service;
import org.briarproject.bramble.api.lifecycle.ServiceException;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.ConnectionHandler;
import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.api.plugin.Plugin;
import org.briarproject.bramble.api.plugin.PluginManager;
import org.briarproject.bramble.api.plugin.TransportConnectionReader;
import org.briarproject.bramble.api.plugin.TransportConnectionWriter;
import org.briarproject.bramble.api.plugin.TransportId;
import org.briarproject.bramble.api.plugin.duplex.DuplexPlugin;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
import org.briarproject.bramble.api.plugin.event.TransportDisabledEvent;
import org.briarproject.bramble.api.plugin.event.TransportEnabledEvent;
import org.briarproject.bramble.api.properties.TransportProperties;
import org.briarproject.bramble.api.rendezvous.KeyMaterialSource;
import org.briarproject.bramble.api.rendezvous.RendezvousEndpoint;
import org.briarproject.bramble.api.rendezvous.event.RendezvousFailedEvent;
import org.briarproject.bramble.api.system.Clock;
import org.briarproject.bramble.api.system.Scheduler;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.logging.Level.INFO;
import static java.util.logging.Level.WARNING;
import static java.util.logging.Logger.getLogger;
import static org.briarproject.bramble.api.nullsafety.NullSafety.requireNull;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.POLLING_INTERVAL_MS;
import static org.briarproject.bramble.rendezvous.RendezvousConstants.RENDEZVOUS_TIMEOUT_MS;
import static org.briarproject.bramble.util.IoUtils.tryToClose;
import static org.briarproject.bramble.util.LogUtils.logException;
@NotNullByDefault
class RendezvousPollerImpl implements RendezvousPoller, Service, EventListener {
private static final Logger LOG =
getLogger(RendezvousPollerImpl.class.getName());
private final ScheduledExecutorService scheduler;
private final DatabaseComponent db;
private final IdentityManager identityManager;
private final TransportCrypto transportCrypto;
private final RendezvousCrypto rendezvousCrypto;
private final PluginManager pluginManager;
private final ConnectionManager connectionManager;
private final EventBus eventBus;
private final Clock clock;
// Executor that runs one task at a time
private final Executor worker;
// The following fields are only accessed on the worker
private final Map<TransportId, PluginState> pluginStates = new HashMap<>();
private final Map<PendingContactId, CryptoState> cryptoStates =
new HashMap<>();
@Nullable
private KeyPair handshakeKeyPair = null;
@Inject
RendezvousPollerImpl(@IoExecutor Executor ioExecutor,
@Scheduler ScheduledExecutorService scheduler,
DatabaseComponent db,
IdentityManager identityManager,
TransportCrypto transportCrypto,
RendezvousCrypto rendezvousCrypto,
PluginManager pluginManager,
ConnectionManager connectionManager,
EventBus eventBus,
Clock clock) {
this.scheduler = scheduler;
this.db = db;
this.identityManager = identityManager;
this.transportCrypto = transportCrypto;
this.rendezvousCrypto = rendezvousCrypto;
this.pluginManager = pluginManager;
this.connectionManager = connectionManager;
this.eventBus = eventBus;
this.clock = clock;
worker = new PoliteExecutor("RendezvousPoller", ioExecutor, 1);
}
@Override
public void startService() throws ServiceException {
try {
db.transaction(true, txn -> {
Collection<PendingContact> pending = db.getPendingContacts(txn);
// Use a commit action to prevent races with add/remove events
txn.attach(() -> addPendingContactsAsync(pending));
});
} catch (DbException e) {
throw new ServiceException(e);
}
scheduler.scheduleAtFixedRate(this::poll, POLLING_INTERVAL_MS,
POLLING_INTERVAL_MS, MILLISECONDS);
}
@EventExecutor
private void addPendingContactsAsync(Collection<PendingContact> pending) {
worker.execute(() -> {
for (PendingContact p : pending) addPendingContact(p);
});
}
// Worker
private void addPendingContact(PendingContact p) {
long now = clock.currentTimeMillis();
long expiry = p.getTimestamp() + RENDEZVOUS_TIMEOUT_MS;
if (expiry > now) {
scheduler.schedule(() -> expirePendingContactAsync(p.getId()),
expiry - now, MILLISECONDS);
} else {
eventBus.broadcast(new RendezvousFailedEvent(p.getId()));
return;
}
try {
if (handshakeKeyPair == null) {
handshakeKeyPair = db.transactionWithResult(true,
identityManager::getHandshakeKeys);
}
SecretKey staticMasterKey = transportCrypto
.deriveStaticMasterKey(p.getPublicKey(), handshakeKeyPair);
SecretKey rendezvousKey = rendezvousCrypto
.deriveRendezvousKey(staticMasterKey);
boolean alice = transportCrypto
.isAlice(p.getPublicKey(), handshakeKeyPair);
CryptoState cs = new CryptoState(rendezvousKey, alice);
requireNull(cryptoStates.put(p.getId(), cs));
for (PluginState ps : pluginStates.values()) {
RendezvousEndpoint endpoint =
createEndpoint(ps.plugin, p.getId(), cs);
if (endpoint != null)
requireNull(ps.endpoints.put(p.getId(), endpoint));
}
} catch (DbException | GeneralSecurityException e) {
logException(LOG, WARNING, e);
}
}
@Scheduler
private void expirePendingContactAsync(PendingContactId p) {
worker.execute(() -> expirePendingContact(p));
}
// Worker
private void expirePendingContact(PendingContactId p) {
if (removePendingContact(p))
eventBus.broadcast(new RendezvousFailedEvent(p));
}
// Worker
private boolean removePendingContact(PendingContactId p) {
// We can come here twice if a pending contact fails and is then removed
if (cryptoStates.remove(p) == null) return false;
for (PluginState ps : pluginStates.values()) {
RendezvousEndpoint endpoint = ps.endpoints.remove(p);
if (endpoint != null) tryToClose(endpoint, LOG, INFO);
}
return true;
}
@Nullable
private RendezvousEndpoint createEndpoint(DuplexPlugin plugin,
PendingContactId p, CryptoState cs) {
TransportId t = plugin.getId();
KeyMaterialSource k =
rendezvousCrypto.createKeyMaterialSource(cs.rendezvousKey, t);
Handler h = new Handler(p, t, true);
return plugin.createRendezvousEndpoint(k, cs.alice, h);
}
@Scheduler
private void poll() {
worker.execute(() -> {
for (PluginState ps : pluginStates.values()) poll(ps);
});
}
// Worker
private void poll(PluginState ps) {
List<Pair<TransportProperties, ConnectionHandler>> properties =
new ArrayList<>();
for (Entry<PendingContactId, RendezvousEndpoint> e :
ps.endpoints.entrySet()) {
TransportProperties props =
e.getValue().getRemoteTransportProperties();
Handler h = new Handler(e.getKey(), ps.plugin.getId(), false);
properties.add(new Pair<>(props, h));
}
ps.plugin.poll(properties);
}
@Override
public void stopService() {
}
@Override
public void eventOccurred(Event e) {
if (e instanceof PendingContactAddedEvent) {
PendingContactAddedEvent p = (PendingContactAddedEvent) e;
addPendingContactAsync(p.getPendingContact());
} else if (e instanceof PendingContactRemovedEvent) {
PendingContactRemovedEvent p = (PendingContactRemovedEvent) e;
removePendingContactAsync(p.getId());
} else if (e instanceof TransportEnabledEvent) {
TransportEnabledEvent t = (TransportEnabledEvent) e;
addTransportAsync(t.getTransportId());
} else if (e instanceof TransportDisabledEvent) {
TransportDisabledEvent t = (TransportDisabledEvent) e;
removeTransportAsync(t.getTransportId());
}
}
@EventExecutor
private void addPendingContactAsync(PendingContact p) {
worker.execute(() -> {
addPendingContact(p);
poll(p.getId());
});
}
// Worker
private void poll(PendingContactId p) {
for (PluginState ps : pluginStates.values()) {
RendezvousEndpoint endpoint = ps.endpoints.get(p);
if (endpoint != null) {
TransportProperties props =
endpoint.getRemoteTransportProperties();
Handler h = new Handler(p, ps.plugin.getId(), false);
ps.plugin.poll(singletonList(new Pair<>(props, h)));
}
}
}
@EventExecutor
private void removePendingContactAsync(PendingContactId p) {
worker.execute(() -> removePendingContact(p));
}
@EventExecutor
private void addTransportAsync(TransportId t) {
Plugin p = pluginManager.getPlugin(t);
if (p instanceof DuplexPlugin) {
DuplexPlugin d = (DuplexPlugin) p;
if (d.supportsRendezvous())
worker.execute(() -> addTransport(d));
}
}
// Worker
private void addTransport(DuplexPlugin plugin) {
TransportId t = plugin.getId();
Map<PendingContactId, RendezvousEndpoint> endpoints = new HashMap<>();
for (Entry<PendingContactId, CryptoState> e : cryptoStates.entrySet()) {
RendezvousEndpoint endpoint =
createEndpoint(plugin, e.getKey(), e.getValue());
if (endpoint != null) endpoints.put(e.getKey(), endpoint);
}
requireNull(pluginStates.put(t, new PluginState(plugin, endpoints)));
}
@EventExecutor
private void removeTransportAsync(TransportId t) {
worker.execute(() -> removeTransport(t));
}
// Worker
private void removeTransport(TransportId t) {
PluginState ps = pluginStates.remove(t);
if (ps != null) {
for (RendezvousEndpoint endpoint : ps.endpoints.values()) {
tryToClose(endpoint, LOG, INFO);
}
}
}
private static class PluginState {
private final DuplexPlugin plugin;
private final Map<PendingContactId, RendezvousEndpoint> endpoints;
private PluginState(DuplexPlugin plugin,
Map<PendingContactId, RendezvousEndpoint> endpoints) {
this.plugin = plugin;
this.endpoints = endpoints;
}
}
private static class CryptoState {
private final SecretKey rendezvousKey;
private final boolean alice;
private CryptoState(SecretKey rendezvousKey, boolean alice) {
this.rendezvousKey = rendezvousKey;
this.alice = alice;
}
}
private class Handler implements ConnectionHandler {
private final PendingContactId pendingContactId;
private final TransportId transportId;
private final boolean incoming;
private Handler(PendingContactId pendingContactId,
TransportId transportId, boolean incoming) {
this.pendingContactId = pendingContactId;
this.transportId = transportId;
this.incoming = incoming;
}
@Override
public void handleConnection(DuplexTransportConnection c) {
if (incoming) {
connectionManager.manageIncomingConnection(