Commit a650d812 authored by Torsten Grote's avatar Torsten Grote

Merge branch '1571-connection-manager-pending-contacts' into 'master'

Add rendezvous connection support to connection manager

Closes #1571

See merge request !1120
parents 1c56068b c536782e
Pipeline #3448 passed with stage
in 7 minutes and 42 seconds
package org.briarproject.bramble.api.plugin;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
import org.briarproject.bramble.api.plugin.duplex.DuplexTransportConnection;
@NotNullByDefault
public interface ConnectionManager {
/**
* Manages an incoming connection from a contact over a simplex transport.
*/
void manageIncomingConnection(TransportId t, TransportConnectionReader r);
/**
* Manages an incoming connection from a contact over a duplex transport.
*/
void manageIncomingConnection(TransportId t, DuplexTransportConnection d);
/**
* Manages an incoming handshake connection from a pending contact over a
* duplex transport.
*/
void manageIncomingConnection(PendingContactId p, TransportId t,
DuplexTransportConnection d);
/**
* Manages an outgoing connection to a contact over a simplex transport.
*/
void manageOutgoingConnection(ContactId c, TransportId t,
TransportConnectionWriter w);
/**
* Manages an outgoing connection to a contact over a duplex transport.
*/
void manageOutgoingConnection(ContactId c, TransportId t,
DuplexTransportConnection d);
/**
* Manages an outgoing handshake connection to a pending contact over a
* duplex transport.
*/
void manageOutgoingConnection(PendingContactId p, TransportId t,
DuplexTransportConnection d);
}
package org.briarproject.bramble.plugin;
import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactExchangeManager;
import org.briarproject.bramble.api.contact.ContactId;
import org.briarproject.bramble.api.contact.HandshakeManager;
import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult;
import org.briarproject.bramble.api.contact.PendingContactId;
import org.briarproject.bramble.api.db.DbException;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
......@@ -44,6 +49,8 @@ class ConnectionManagerImpl implements ConnectionManager {
private final StreamReaderFactory streamReaderFactory;
private final StreamWriterFactory streamWriterFactory;
private final SyncSessionFactory syncSessionFactory;
private final HandshakeManager handshakeManager;
private final ContactExchangeManager contactExchangeManager;
private final ConnectionRegistry connectionRegistry;
@Inject
......@@ -51,12 +58,16 @@ class ConnectionManagerImpl implements ConnectionManager {
KeyManager keyManager, StreamReaderFactory streamReaderFactory,
StreamWriterFactory streamWriterFactory,
SyncSessionFactory syncSessionFactory,
HandshakeManager handshakeManager,
ContactExchangeManager contactExchangeManager,
ConnectionRegistry connectionRegistry) {
this.ioExecutor = ioExecutor;
this.keyManager = keyManager;
this.streamReaderFactory = streamReaderFactory;
this.streamWriterFactory = streamWriterFactory;
this.syncSessionFactory = syncSessionFactory;
this.handshakeManager = handshakeManager;
this.contactExchangeManager = contactExchangeManager;
this.connectionRegistry = connectionRegistry;
}
......@@ -72,6 +83,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new ManageIncomingDuplexConnection(t, d));
}
@Override
public void manageIncomingConnection(PendingContactId p, TransportId t,
DuplexTransportConnection d) {
ioExecutor.execute(new ManageIncomingHandshakeConnection(p, t, d));
}
@Override
public void manageOutgoingConnection(ContactId c, TransportId t,
TransportConnectionWriter w) {
......@@ -84,6 +101,12 @@ class ConnectionManagerImpl implements ConnectionManager {
ioExecutor.execute(new ManageOutgoingDuplexConnection(c, t, d));
}
@Override
public void manageOutgoingConnection(PendingContactId p, TransportId t,
DuplexTransportConnection d) {
ioExecutor.execute(new ManageOutgoingHandshakeConnection(p, t, d));
}
private byte[] readTag(InputStream in) throws IOException {
byte[] tag = new byte[TAG_LENGTH];
read(in, tag);
......@@ -467,4 +490,205 @@ class ConnectionManagerImpl implements ConnectionManager {
disposeOnError(writer);
}
}
private class ManageIncomingHandshakeConnection implements Runnable {
private final PendingContactId pendingContactId;
private final TransportId transportId;
private final DuplexTransportConnection connection;
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private ManageIncomingHandshakeConnection(
PendingContactId pendingContactId, TransportId transportId,
DuplexTransportConnection connection) {
this.pendingContactId = pendingContactId;
this.transportId = transportId;
this.connection = connection;
reader = connection.getReader();
writer = connection.getWriter();
}
@Override
public void run() {
// Read and recognise the tag
StreamContext ctxIn;
try {
byte[] tag = readTag(reader.getInputStream());
ctxIn = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
onError(false);
return;
}
if (ctxIn == null) {
LOG.info("Unrecognised tag");
onError(false);
return;
}
PendingContactId inPendingContactId = ctxIn.getPendingContactId();
if (inPendingContactId == null) {
LOG.warning("Expected rendezvous tag, got contact tag");
onError(true);
return;
}
// Allocate the outgoing stream context
StreamContext ctxOut;
try {
ctxOut = keyManager.getStreamContext(pendingContactId,
transportId);
} catch (DbException e) {
logException(LOG, WARNING, e);
onError(true);
return;
}
if (ctxOut == null) {
LOG.warning("Could not allocate stream context");
onError(true);
return;
}
// Close the connection if it's redundant
if (!connectionRegistry.registerConnection(pendingContactId)) {
LOG.info("Redundant rendezvous connection");
onError(true);
return;
}
// Handshake and exchange contacts
try {
InputStream in = streamReaderFactory.createStreamReader(
reader.getInputStream(), ctxIn);
// Flush the output stream to send the outgoing stream header
StreamWriter out = streamWriterFactory.createStreamWriter(
writer.getOutputStream(), ctxOut);
out.getOutputStream().flush();
HandshakeResult result = handshakeManager.handshake(
pendingContactId, in, out);
Contact contact = contactExchangeManager.exchangeContacts(
pendingContactId, connection, result.getMasterKey(),
result.isAlice(), false);
connectionRegistry.unregisterConnection(pendingContactId, true);
// Reuse the connection as a transport connection
manageOutgoingConnection(contact.getId(), transportId,
connection);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
onError(true);
connectionRegistry.unregisterConnection(pendingContactId,
false);
}
}
private void onError(boolean recognised) {
disposeOnError(reader, recognised);
disposeOnError(writer);
}
}
private class ManageOutgoingHandshakeConnection implements Runnable {
private final PendingContactId pendingContactId;
private final TransportId transportId;
private final DuplexTransportConnection connection;
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
private ManageOutgoingHandshakeConnection(
PendingContactId pendingContactId, TransportId transportId,
DuplexTransportConnection connection) {
this.pendingContactId = pendingContactId;
this.transportId = transportId;
this.connection = connection;
reader = connection.getReader();
writer = connection.getWriter();
}
@Override
public void run() {
// Allocate the outgoing stream context
StreamContext ctxOut;
try {
ctxOut = keyManager.getStreamContext(pendingContactId,
transportId);
} catch (DbException e) {
logException(LOG, WARNING, e);
onError();
return;
}
if (ctxOut == null) {
LOG.warning("Could not allocate stream context");
onError();
return;
}
// Flush the output stream to send the outgoing stream header
StreamWriter out;
try {
out = streamWriterFactory.createStreamWriter(
writer.getOutputStream(), ctxOut);
out.getOutputStream().flush();
} catch (IOException e) {
logException(LOG, WARNING, e);
onError();
return;
}
// Read and recognise the tag
StreamContext ctxIn;
try {
byte[] tag = readTag(reader.getInputStream());
ctxIn = keyManager.getStreamContext(transportId, tag);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
onError();
return;
}
// Unrecognised tags are suspicious in this case
if (ctxIn == null) {
LOG.warning("Unrecognised tag for returning stream");
onError();
return;
}
// Check that the stream comes from the expected pending contact
PendingContactId inPendingContactId = ctxIn.getPendingContactId();
if (inPendingContactId == null) {
LOG.warning("Expected rendezvous tag, got contact tag");
onError();
return;
}
if (!inPendingContactId.equals(pendingContactId)) {
LOG.warning("Wrong pending contact ID for returning stream");
onError();
return;
}
// Close the connection if it's redundant
if (!connectionRegistry.registerConnection(pendingContactId)) {
LOG.info("Redundant rendezvous connection");
onError();
return;
}
// Handshake and exchange contacts
try {
InputStream in = streamReaderFactory.createStreamReader(
reader.getInputStream(), ctxIn);
HandshakeResult result = handshakeManager.handshake(
pendingContactId, in, out);
Contact contact = contactExchangeManager.exchangeContacts(
pendingContactId, connection, result.getMasterKey(),
result.isAlice(), false);
connectionRegistry.unregisterConnection(pendingContactId, true);
// Reuse the connection as a transport connection
manageOutgoingConnection(contact.getId(), transportId,
connection);
} catch (IOException | DbException e) {
logException(LOG, WARNING, e);
onError();
connectionRegistry.unregisterConnection(pendingContactId,
false);
}
}
private void onError() {
// 'Recognised' is always true for outgoing connections
disposeOnError(reader, true);
disposeOnError(writer);
}
}
}
......@@ -3,9 +3,9 @@ package org.briarproject.bramble.contact;
import org.briarproject.bramble.api.Pair;
import org.briarproject.bramble.api.contact.Contact;
import org.briarproject.bramble.api.contact.ContactManager;
import org.briarproject.bramble.api.contact.HandshakeManager.HandshakeResult;
import org.briarproject.bramble.api.contact.PendingContact;
import org.briarproject.bramble.api.contact.PendingContactState;
import org.briarproject.bramble.api.contact.event.ContactAddedEvent;
import org.briarproject.bramble.api.crypto.PublicKey;
import org.briarproject.bramble.api.crypto.SecretKey;
import org.briarproject.bramble.api.identity.Identity;
......@@ -14,19 +14,14 @@ import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.test.BrambleTestCase;
import org.briarproject.bramble.test.TestDatabaseConfigModule;
import org.briarproject.bramble.test.TestDuplexTransportConnection;
import org.briarproject.bramble.test.TestStreamWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static junit.framework.TestCase.assertNotNull;
......@@ -34,12 +29,12 @@ import static junit.framework.TestCase.assertNull;
import static junit.framework.TestCase.fail;
import static org.briarproject.bramble.api.contact.PendingContactState.WAITING_FOR_CONNECTION;
import static org.briarproject.bramble.test.TestDuplexTransportConnection.createPair;
import static org.briarproject.bramble.test.TestPluginConfigModule.DUPLEX_TRANSPORT_ID;
import static org.briarproject.bramble.test.TestUtils.deleteTestDirectory;
import static org.briarproject.bramble.test.TestUtils.getSecretKey;
import static org.briarproject.bramble.test.TestUtils.getTestDirectory;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class ContactExchangeIntegrationTest extends BrambleTestCase {
......@@ -116,8 +111,8 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase {
fail();
}
});
aliceFinished.await(TIMEOUT, MILLISECONDS);
bobFinished.await(TIMEOUT, MILLISECONDS);
assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS));
assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS));
assertContacts(verified, false);
assertNoPendingContacts();
}
......@@ -155,8 +150,8 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase {
fail();
}
});
aliceFinished.await(TIMEOUT, MILLISECONDS);
bobFinished.await(TIMEOUT, MILLISECONDS);
assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS));
assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS));
assertContacts(verified, true);
assertNoPendingContacts();
}
......@@ -168,54 +163,25 @@ public class ContactExchangeIntegrationTest extends BrambleTestCase {
PendingContact aliceFromBob = addPendingContact(bob, alice);
assertPendingContacts();
PipedInputStream aliceHandshakeIn = new PipedInputStream();
PipedInputStream bobHandshakeIn = new PipedInputStream();
OutputStream aliceHandshakeOut = new PipedOutputStream(bobHandshakeIn);
OutputStream bobHandshakeOut = new PipedOutputStream(aliceHandshakeIn);
AtomicReference<HandshakeResult> aliceResult = new AtomicReference<>();
AtomicReference<HandshakeResult> bobResult = new AtomicReference<>();
TestDuplexTransportConnection[] pair = createPair();
TestDuplexTransportConnection aliceConnection = pair[0];
TestDuplexTransportConnection bobConnection = pair[1];
CountDownLatch aliceFinished = new CountDownLatch(1);
CountDownLatch bobFinished = new CountDownLatch(1);
boolean verified = random.nextBoolean();
alice.getIoExecutor().execute(() -> {
try {
HandshakeResult result = alice.getHandshakeManager().handshake(
bobFromAlice.getId(), aliceHandshakeIn,
new TestStreamWriter(aliceHandshakeOut));
aliceResult.set(result);
alice.getContactExchangeManager().exchangeContacts(
bobFromAlice.getId(), aliceConnection,
result.getMasterKey(), result.isAlice(), verified);
aliceFinished.countDown();
} catch (Exception e) {
fail();
}
alice.getEventBus().addListener(e -> {
if (e instanceof ContactAddedEvent) aliceFinished.countDown();
});
bob.getIoExecutor().execute(() -> {
try {
HandshakeResult result = bob.getHandshakeManager().handshake(
aliceFromBob.getId(), bobHandshakeIn,
new TestStreamWriter(bobHandshakeOut));
bobResult.set(result);
bob.getContactExchangeManager().exchangeContacts(
aliceFromBob.getId(), bobConnection,
result.getMasterKey(), result.isAlice(), verified);
bobFinished.countDown();
} catch (Exception e) {
fail();
}
alice.getConnectionManager().manageOutgoingConnection(
bobFromAlice.getId(), DUPLEX_TRANSPORT_ID, aliceConnection);
bob.getEventBus().addListener(e -> {
if (e instanceof ContactAddedEvent) bobFinished.countDown();
});
aliceFinished.await(TIMEOUT, MILLISECONDS);
bobFinished.await(TIMEOUT, MILLISECONDS);
assertArrayEquals(aliceResult.get().getMasterKey().getBytes(),
bobResult.get().getMasterKey().getBytes());
assertNotEquals(aliceResult.get().isAlice(), bobResult.get().isAlice());
assertContacts(verified, true);
bob.getConnectionManager().manageIncomingConnection(
aliceFromBob.getId(), DUPLEX_TRANSPORT_ID, bobConnection);
assertTrue(aliceFinished.await(TIMEOUT, MILLISECONDS));
assertTrue(bobFinished.await(TIMEOUT, MILLISECONDS));
assertContacts(false, true);
assertNoPendingContacts();
}
......
......@@ -4,10 +4,11 @@ import org.briarproject.bramble.BrambleCoreEagerSingletons;
import org.briarproject.bramble.BrambleCoreModule;
import org.briarproject.bramble.api.contact.ContactExchangeManager;
import org.briarproject.bramble.api.contact.ContactManager;
import org.briarproject.bramble.api.contact.HandshakeManager;
import org.briarproject.bramble.api.event.EventBus;
import org.briarproject.bramble.api.identity.IdentityManager;
import org.briarproject.bramble.api.lifecycle.IoExecutor;
import org.briarproject.bramble.api.lifecycle.LifecycleManager;
import org.briarproject.bramble.api.plugin.ConnectionManager;
import org.briarproject.bramble.test.BrambleCoreIntegrationTestModule;
import java.util.concurrent.Executor;
......@@ -24,11 +25,13 @@ import dagger.Component;
interface ContactExchangeIntegrationTestComponent
extends BrambleCoreEagerSingletons {
ConnectionManager getConnectionManager();
ContactExchangeManager getContactExchangeManager();
ContactManager getContactManager();
HandshakeManager getHandshakeManager();
EventBus getEventBus();
IdentityManager getIdentityManager();
......
......@@ -21,6 +21,7 @@ public class TestDuplexTransportConnection
private final TransportConnectionReader reader;
private final TransportConnectionWriter writer;
@SuppressWarnings("WeakerAccess")
public TestDuplexTransportConnection(InputStream in, OutputStream out) {
reader = new TestTransportConnectionReader(in);
writer = new TestTransportConnectionWriter(out);
......@@ -42,8 +43,9 @@ public class TestDuplexTransportConnection
*/
public static TestDuplexTransportConnection[] createPair()
throws IOException {
PipedInputStream aliceIn = new PipedInputStream();
PipedInputStream bobIn = new PipedInputStream();
// Use 64k buffers to prevent deadlock
PipedInputStream aliceIn = new PipedInputStream(1 << 16);
PipedInputStream bobIn = new PipedInputStream(1 << 16);
PipedOutputStream aliceOut = new PipedOutputStream(bobIn);
PipedOutputStream bobOut = new PipedOutputStream(aliceIn);
TestDuplexTransportConnection alice =
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment