Commit ed1cefa1 authored by akwizgran's avatar akwizgran

Use concurrent map for pending contact states.

parent 23354d65
...@@ -33,13 +33,12 @@ import org.briarproject.bramble.api.transport.KeyManager; ...@@ -33,13 +33,12 @@ import org.briarproject.bramble.api.transport.KeyManager;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe; import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject; import javax.inject.Inject;
...@@ -64,10 +63,8 @@ class ContactManagerImpl implements ContactManager, EventListener { ...@@ -64,10 +63,8 @@ class ContactManagerImpl implements ContactManager, EventListener {
private final EventBus eventBus; private final EventBus eventBus;
private final List<ContactHook> hooks = new CopyOnWriteArrayList<>(); private final List<ContactHook> hooks = new CopyOnWriteArrayList<>();
private final Object statesLock = new Object(); private final ConcurrentMap<PendingContactId, PendingContactState> states =
@GuardedBy("statesLock") new ConcurrentHashMap<>();
private final Map<PendingContactId, PendingContactState> states =
new HashMap<>();
@Inject @Inject
ContactManagerImpl(DatabaseComponent db, ContactManagerImpl(DatabaseComponent db,
...@@ -176,10 +173,7 @@ class ContactManagerImpl implements ContactManager, EventListener { ...@@ -176,10 +173,7 @@ class ContactManagerImpl implements ContactManager, EventListener {
List<Pair<PendingContact, PendingContactState>> pairs = List<Pair<PendingContact, PendingContactState>> pairs =
new ArrayList<>(pendingContacts.size()); new ArrayList<>(pendingContacts.size());
for (PendingContact p : pendingContacts) { for (PendingContact p : pendingContacts) {
PendingContactState state; PendingContactState state = states.get(p.getId());
synchronized (statesLock) {
state = states.get(p.getId());
}
if (state == null) state = WAITING_FOR_CONNECTION; if (state == null) state = WAITING_FOR_CONNECTION;
pairs.add(new Pair<>(p, state)); pairs.add(new Pair<>(p, state));
} }
...@@ -189,9 +183,7 @@ class ContactManagerImpl implements ContactManager, EventListener { ...@@ -189,9 +183,7 @@ class ContactManagerImpl implements ContactManager, EventListener {
@Override @Override
public void removePendingContact(PendingContactId p) throws DbException { public void removePendingContact(PendingContactId p) throws DbException {
db.transaction(false, txn -> db.removePendingContact(txn, p)); db.transaction(false, txn -> db.removePendingContact(txn, p));
synchronized (statesLock) { states.remove(p);
states.remove(p);
}
} }
@Override @Override
...@@ -296,29 +288,39 @@ class ContactManagerImpl implements ContactManager, EventListener { ...@@ -296,29 +288,39 @@ class ContactManagerImpl implements ContactManager, EventListener {
if (e instanceof RendezvousConnectionOpenedEvent) { if (e instanceof RendezvousConnectionOpenedEvent) {
RendezvousConnectionOpenedEvent r = RendezvousConnectionOpenedEvent r =
(RendezvousConnectionOpenedEvent) e; (RendezvousConnectionOpenedEvent) e;
setState(r.getPendingContactId(), ADDING_CONTACT); PendingContactId p = r.getPendingContactId();
setState(p, WAITING_FOR_CONNECTION, ADDING_CONTACT);
} else if (e instanceof RendezvousConnectionClosedEvent) { } else if (e instanceof RendezvousConnectionClosedEvent) {
RendezvousConnectionClosedEvent r = RendezvousConnectionClosedEvent r =
(RendezvousConnectionClosedEvent) e; (RendezvousConnectionClosedEvent) e;
// We're only interested in failures - if the rendezvous succeeds // We're only interested in failures - if the rendezvous succeeds
// the pending contact will be removed // the pending contact will be removed
if (!r.isSuccess()) if (!r.isSuccess()) {
setState(r.getPendingContactId(), WAITING_FOR_CONNECTION); PendingContactId p = r.getPendingContactId();
setState(p, ADDING_CONTACT, WAITING_FOR_CONNECTION);
}
} else if (e instanceof RendezvousFailedEvent) { } else if (e instanceof RendezvousFailedEvent) {
RendezvousFailedEvent r = (RendezvousFailedEvent) e; RendezvousFailedEvent r = (RendezvousFailedEvent) e;
setState(r.getPendingContactId(), FAILED); setState(r.getPendingContactId(), FAILED);
} }
} }
/* /**
* Sets the state of the given pending contact and broadcasts an event, * Sets the state of the given pending contact and broadcasts an event.
* unless the current state is FAILED.
*/ */
private void setState(PendingContactId p, PendingContactState state) { private void setState(PendingContactId p, PendingContactState state) {
synchronized (statesLock) { states.put(p, state);
if (states.get(p) == FAILED) return; eventBus.broadcast(new PendingContactStateChangedEvent(p, state));
states.put(p, state); }
/*
* Sets the state of the given pending contact and broadcasts an event if
* there is no current state or the current state equals {@code expected}.
*/
private void setState(PendingContactId p, PendingContactState expected,
PendingContactState state) {
PendingContactState old = states.putIfAbsent(p, state);
if (old == null || states.replace(p, expected, state))
eventBus.broadcast(new PendingContactStateChangedEvent(p, state)); eventBus.broadcast(new PendingContactStateChangedEvent(p, state));
}
} }
} }
...@@ -329,41 +329,53 @@ public class ContactManagerImplTest extends BrambleMockTestCase { ...@@ -329,41 +329,53 @@ public class ContactManagerImplTest extends BrambleMockTestCase {
} }
@Test @Test
public void testFailedStateIsNotReplaced() throws Exception { public void testPendingContactFailsBeforeConnection() {
Transaction txn = new Transaction(null, true); // The pending contact expires - the FAILED state is broadcast
context.checking(new Expectations() {{ context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == ADDING_CONTACT)));
oneOf(eventBus).broadcast(with(new PredicateMatcher<>( oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e -> PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED))); e.getPendingContactState() == FAILED)));
}}); }});
contactManager.eventOccurred(new RendezvousFailedEvent(
pendingContact.getId()));
// A rendezvous connection is opened, then the pending contact expires, // A rendezvous connection is opened - no state is broadcast
// then the rendezvous connection is closed
contactManager.eventOccurred(new RendezvousConnectionOpenedEvent( contactManager.eventOccurred(new RendezvousConnectionOpenedEvent(
pendingContact.getId())); pendingContact.getId()));
contactManager.eventOccurred(new RendezvousFailedEvent( context.assertIsSatisfied();
pendingContact.getId()));
// The rendezvous connection fails - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionClosedEvent( contactManager.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false)); pendingContact.getId(), false));
context.assertIsSatisfied(); context.assertIsSatisfied();
}
context.checking(new DbExpectations() {{ @Test
oneOf(db).transactionWithResult(with(true), withDbCallable(txn)); public void testPendingContactFailsDuringConnection() {
oneOf(db).getPendingContacts(txn); // A rendezvous connection is opened - the ADDING_CONTACT state is
will(returnValue(singletonList(pendingContact))); // broadcast
context.checking(new Expectations() {{
oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == ADDING_CONTACT)));
}}); }});
// The FAILED state should not have been overwritten contactManager.eventOccurred(new RendezvousConnectionOpenedEvent(
Collection<Pair<PendingContact, PendingContactState>> pairs = pendingContact.getId()));
contactManager.getPendingContacts(); context.assertIsSatisfied();
assertEquals(1, pairs.size());
Pair<PendingContact, PendingContactState> pair = // The pending contact expires - the FAILED state is broadcast
pairs.iterator().next(); context.checking(new Expectations() {{
assertEquals(pendingContact, pair.getFirst()); oneOf(eventBus).broadcast(with(new PredicateMatcher<>(
assertEquals(FAILED, pair.getSecond()); PendingContactStateChangedEvent.class, e ->
e.getPendingContactState() == FAILED)));
}});
contactManager.eventOccurred(new RendezvousFailedEvent(
pendingContact.getId()));
// The rendezvous connection fails - no state is broadcast
contactManager.eventOccurred(new RendezvousConnectionClosedEvent(
pendingContact.getId(), false));
context.assertIsSatisfied();
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment