diff --git a/api/net/sf/briar/api/protocol/VerificationExecutor.java b/api/net/sf/briar/api/protocol/VerificationExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..90a20d7c53e82280e8e24973887a142d20b04492 --- /dev/null +++ b/api/net/sf/briar/api/protocol/VerificationExecutor.java @@ -0,0 +1,15 @@ +package net.sf.briar.api.protocol; + +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import com.google.inject.BindingAnnotation; + +/** Annotation for injecting the executor for message verification tasks. */ +@BindingAnnotation +@Target({ PARAMETER }) +@Retention(RUNTIME) +public @interface VerificationExecutor {} diff --git a/components/net/sf/briar/db/DatabaseExecutorImpl.java b/components/net/sf/briar/db/DatabaseExecutorImpl.java index 20cbf810a48bf21456032fffd1682b30eceeb389..8692b3e927656fb76514445a4e18540fff452c6a 100644 --- a/components/net/sf/briar/db/DatabaseExecutorImpl.java +++ b/components/net/sf/briar/db/DatabaseExecutorImpl.java @@ -6,6 +6,10 @@ import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +/** + * An executor that limits the number of concurrent database tasks and the + * number of tasks queued for execution. + */ class DatabaseExecutorImpl implements Executor { // FIXME: Determine suitable values for these constants empirically @@ -28,14 +32,15 @@ class DatabaseExecutorImpl implements Executor { this(MAX_QUEUED_TASKS, MIN_THREADS, MAX_THREADS); } - DatabaseExecutorImpl(int maxQueuedTasks, int minThreads, int maxThreads) { - queue = new ArrayBlockingQueue<Runnable>(maxQueuedTasks); + DatabaseExecutorImpl(int maxQueued, int minThreads, int maxThreads) { + queue = new ArrayBlockingQueue<Runnable>(maxQueued); new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS, queue); } public void execute(Runnable r) { try { + // Block until there's space in the queue queue.put(r); } catch(InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/components/net/sf/briar/protocol/ProtocolModule.java b/components/net/sf/briar/protocol/ProtocolModule.java index cf83575c9363863d040073d0360c4a2d324eef27..d3eda315b527d01f430cfbc71e1833fb64b0551a 100644 --- a/components/net/sf/briar/protocol/ProtocolModule.java +++ b/components/net/sf/briar/protocol/ProtocolModule.java @@ -1,5 +1,7 @@ package net.sf.briar.protocol; +import java.util.concurrent.Executor; + import net.sf.briar.api.crypto.CryptoComponent; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.Author; @@ -15,10 +17,12 @@ import net.sf.briar.api.protocol.Request; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedBatch; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.serial.ObjectReader; import com.google.inject.AbstractModule; import com.google.inject.Provides; +import com.google.inject.Singleton; public class ProtocolModule extends AbstractModule { @@ -31,6 +35,8 @@ public class ProtocolModule extends AbstractModule { bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class); bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class); bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class); + bind(Executor.class).annotatedWith(VerificationExecutor.class).to( + VerificationExecutorImpl.class).in(Singleton.class); } @Provides diff --git a/components/net/sf/briar/protocol/VerificationExecutorImpl.java b/components/net/sf/briar/protocol/VerificationExecutorImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..203ae33ffd2681223327f4fd18fe597de837f040 --- /dev/null +++ b/components/net/sf/briar/protocol/VerificationExecutorImpl.java @@ -0,0 +1,47 @@ +package net.sf.briar.protocol; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * An executor that limits the number of concurrent message verification tasks + * and the number of tasks queued for execution. + */ +class VerificationExecutorImpl implements Executor { + + // FIXME: Determine suitable values for these constants empirically + + /** + * The maximum number of tasks that can be queued for execution + * before attempting to execute another task will block. + */ + private static final int MAX_QUEUED_TASKS = 10; + + /** The number of idle threads to keep in the pool. */ + private static final int MIN_THREADS = 1; + + private final BlockingQueue<Runnable> queue; + + VerificationExecutorImpl() { + this(MAX_QUEUED_TASKS, MIN_THREADS, + Runtime.getRuntime().availableProcessors()); + } + + VerificationExecutorImpl(int maxQueued, int minThreads, int maxThreads) { + queue = new ArrayBlockingQueue<Runnable>(maxQueued); + new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS, + queue); + } + + public void execute(Runnable r) { + try { + // Block until there's space in the queue + queue.put(r); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} diff --git a/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java b/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java index 96d1d35e16327ce16a35319064283a251ce09fa0..3afe1a4fa38f91da25c6a940d22cae40937877a7 100644 --- a/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java +++ b/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java @@ -4,9 +4,11 @@ import java.util.concurrent.Executor; import net.sf.briar.api.ContactId; import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.transport.BatchConnectionFactory; import net.sf.briar.api.transport.BatchTransportReader; import net.sf.briar.api.transport.BatchTransportWriter; @@ -18,7 +20,7 @@ import com.google.inject.Inject; class BatchConnectionFactoryImpl implements BatchConnectionFactory { - private final Executor executor; + private final Executor dbExecutor, verificationExecutor; private final DatabaseComponent db; private final ConnectionReaderFactory connReaderFactory; private final ConnectionWriterFactory connWriterFactory; @@ -26,12 +28,14 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory { private final ProtocolWriterFactory protoWriterFactory; @Inject - BatchConnectionFactoryImpl(Executor executor, DatabaseComponent db, - ConnectionReaderFactory connReaderFactory, + BatchConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor, + @VerificationExecutor Executor verificationExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory) { - this.executor = executor; + this.dbExecutor = dbExecutor; + this.verificationExecutor = verificationExecutor; this.db = db; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; @@ -42,8 +46,8 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory { public void createIncomingConnection(ConnectionContext ctx, BatchTransportReader r, byte[] tag) { final IncomingBatchConnection conn = new IncomingBatchConnection( - executor, db, connReaderFactory, protoReaderFactory, ctx, r, - tag); + dbExecutor, verificationExecutor, db, connReaderFactory, + protoReaderFactory, ctx, r, tag); Runnable read = new Runnable() { public void run() { conn.read(); diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java index 2d4c0d810acab8d5016d5232155ecfa0d6a289c5..074245db4ac1a6d5b31898aea239da994a7525e1 100644 --- a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java +++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java @@ -13,11 +13,13 @@ import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.db.DbException; import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.ProtocolReader; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedBatch; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.transport.BatchTransportReader; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; @@ -28,7 +30,7 @@ class IncomingBatchConnection { private static final Logger LOG = Logger.getLogger(IncomingBatchConnection.class.getName()); - private final Executor dbExecutor; + private final Executor dbExecutor, verificationExecutor; private final ConnectionReaderFactory connFactory; private final DatabaseComponent db; private final ProtocolReaderFactory protoFactory; @@ -38,10 +40,12 @@ class IncomingBatchConnection { private final ContactId contactId; IncomingBatchConnection(@DatabaseExecutor Executor dbExecutor, + @VerificationExecutor Executor verificationExecutor, DatabaseComponent db, ConnectionReaderFactory connFactory, ProtocolReaderFactory protoFactory, ConnectionContext ctx, BatchTransportReader transport, byte[] tag) { this.dbExecutor = dbExecutor; + this.verificationExecutor = verificationExecutor; this.connFactory = connFactory; this.db = db; this.protoFactory = protoFactory; @@ -64,7 +68,7 @@ class IncomingBatchConnection { dbExecutor.execute(new ReceiveAck(a)); } else if(reader.hasBatch()) { UnverifiedBatch b = reader.readBatch(); - dbExecutor.execute(new ReceiveBatch(b)); + verificationExecutor.execute(new VerifyBatch(b)); } else if(reader.hasSubscriptionUpdate()) { SubscriptionUpdate s = reader.readSubscriptionUpdate(); dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); @@ -99,26 +103,41 @@ class IncomingBatchConnection { } } - private class ReceiveBatch implements Runnable { + private class VerifyBatch implements Runnable { private final UnverifiedBatch batch; - private ReceiveBatch(UnverifiedBatch batch) { + private VerifyBatch(UnverifiedBatch batch) { this.batch = batch; } public void run() { try { - // FIXME: Don't verify on the DB thread - db.receiveBatch(contactId, batch.verify()); - } catch(DbException e) { - if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + Batch b = batch.verify(); + dbExecutor.execute(new ReceiveBatch(b)); } catch(GeneralSecurityException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } } } + private class ReceiveBatch implements Runnable { + + private final Batch batch; + + private ReceiveBatch(Batch batch) { + this.batch = batch; + } + + public void run() { + try { + db.receiveBatch(contactId, batch); + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + private class ReceiveSubscriptionUpdate implements Runnable { private final SubscriptionUpdate update; diff --git a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java index cfdf931ea02c8405d796a3f93ea9bb82a0e4afd9..6fcf17e6a221ee8b036c781d244a7a1c6304a915 100644 --- a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java @@ -7,6 +7,7 @@ import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; @@ -20,15 +21,16 @@ class IncomingStreamConnection extends StreamConnection { private final byte[] tag; IncomingStreamConnection(@DatabaseExecutor Executor dbExecutor, + @VerificationExecutor Executor verificationExecutor, DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ConnectionContext ctx, StreamTransportConnection connection, byte[] tag) { - super(dbExecutor, db, connReaderFactory, connWriterFactory, - protoReaderFactory, protoWriterFactory, ctx.getContactId(), - connection); + super(dbExecutor, verificationExecutor, db, connReaderFactory, + connWriterFactory, protoReaderFactory, protoWriterFactory, + ctx.getContactId(), connection); this.ctx = ctx; this.tag = tag; } diff --git a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java index a24bce3400d41c73beb5ec4d75de3915439d04a5..d7c41336da8e1b2f6ce2acd505baa03d54b091d2 100644 --- a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java @@ -10,6 +10,7 @@ import net.sf.briar.api.db.DbException; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; @@ -24,14 +25,16 @@ class OutgoingStreamConnection extends StreamConnection { private ConnectionContext ctx = null; // Locking: this OutgoingStreamConnection(@DatabaseExecutor Executor dbExecutor, + @VerificationExecutor Executor verificationExecutor, DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, TransportIndex transportIndex, StreamTransportConnection connection) { - super(dbExecutor, db, connReaderFactory, connWriterFactory, - protoReaderFactory, protoWriterFactory, contactId, connection); + super(dbExecutor, verificationExecutor, db, connReaderFactory, + connWriterFactory, protoReaderFactory, protoWriterFactory, + contactId, connection); this.transportIndex = transportIndex; } diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index 7544d729a6ac6a49370a14af71f598ffcb4fd9e2..44f16211eb3735d8326c675a9def1968eb186a60 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -28,6 +28,7 @@ import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent; import net.sf.briar.api.db.event.MessagesAddedEvent; import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent; import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.Batch; import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.Offer; import net.sf.briar.api.protocol.ProtocolReader; @@ -39,6 +40,7 @@ import net.sf.briar.api.protocol.Request; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedBatch; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionWriter; @@ -50,7 +52,6 @@ abstract class StreamConnection implements DatabaseListener { private static final Logger LOG = Logger.getLogger(StreamConnection.class.getName()); - protected final Executor dbExecutor; protected final DatabaseComponent db; protected final ConnectionReaderFactory connReaderFactory; protected final ConnectionWriterFactory connWriterFactory; @@ -59,6 +60,7 @@ abstract class StreamConnection implements DatabaseListener { protected final ContactId contactId; protected final StreamTransportConnection transport; + private final Executor dbExecutor, verificationExecutor; private final AtomicBoolean canSendOffer; private final LinkedList<Runnable> writerTasks; // Locking: this @@ -68,12 +70,14 @@ abstract class StreamConnection implements DatabaseListener { private volatile boolean closed = false; StreamConnection(@DatabaseExecutor Executor dbExecutor, + @VerificationExecutor Executor verificationExecutor, DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, StreamTransportConnection transport) { this.dbExecutor = dbExecutor; + this.verificationExecutor = verificationExecutor; this.db = db; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; @@ -121,7 +125,7 @@ abstract class StreamConnection implements DatabaseListener { dbExecutor.execute(new ReceiveAck(a)); } else if(reader.hasBatch()) { UnverifiedBatch b = reader.readBatch(); - dbExecutor.execute(new ReceiveBatch(b)); + verificationExecutor.execute(new VerifyBatch(b)); } else if(reader.hasOffer()) { Offer o = reader.readOffer(); dbExecutor.execute(new ReceiveOffer(o)); @@ -232,23 +236,39 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a verification thread + private class VerifyBatch implements Runnable { + + private final UnverifiedBatch batch; + + private VerifyBatch(UnverifiedBatch batch) { + this.batch = batch; + } + + public void run() { + try { + Batch b = batch.verify(); + dbExecutor.execute(new ReceiveBatch(b)); + } catch(GeneralSecurityException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + // This task runs on a database thread private class ReceiveBatch implements Runnable { - private final UnverifiedBatch batch; + private final Batch batch; - private ReceiveBatch(UnverifiedBatch batch) { + private ReceiveBatch(Batch batch) { this.batch = batch; } public void run() { try { - // FIXME: Don't verify on the DB thread - db.receiveBatch(contactId, batch.verify()); + db.receiveBatch(contactId, batch); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - } catch(GeneralSecurityException e) { - if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); } } } diff --git a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java index 67787c8207a373406948e5db0db90868375c0469..b6c100996bf170a9f67609ac44d464e2640291c0 100644 --- a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java +++ b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java @@ -8,6 +8,7 @@ import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; +import net.sf.briar.api.protocol.VerificationExecutor; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionWriterFactory; @@ -18,7 +19,7 @@ import com.google.inject.Inject; class StreamConnectionFactoryImpl implements StreamConnectionFactory { - private final Executor dbExecutor; + private final Executor dbExecutor, verificationExecutor; private final DatabaseComponent db; private final ConnectionReaderFactory connReaderFactory; private final ConnectionWriterFactory connWriterFactory; @@ -27,11 +28,13 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { @Inject StreamConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor, + @VerificationExecutor Executor verificationExecutor, DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory) { this.dbExecutor = dbExecutor; + this.verificationExecutor = verificationExecutor; this.db = db; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; @@ -42,8 +45,8 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { public void createIncomingConnection(ConnectionContext ctx, StreamTransportConnection s, byte[] tag) { final StreamConnection conn = new IncomingStreamConnection(dbExecutor, - db, connReaderFactory, connWriterFactory, protoReaderFactory, - protoWriterFactory, ctx, s, tag); + verificationExecutor, db, connReaderFactory, connWriterFactory, + protoReaderFactory, protoWriterFactory, ctx, s, tag); Runnable write = new Runnable() { public void run() { conn.write(); @@ -61,8 +64,8 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { public void createOutgoingConnection(ContactId c, TransportIndex i, StreamTransportConnection s) { final StreamConnection conn = new OutgoingStreamConnection(dbExecutor, - db, connReaderFactory, connWriterFactory, protoReaderFactory, - protoWriterFactory, c, i, s); + verificationExecutor, db, connReaderFactory, connWriterFactory, + protoReaderFactory, protoWriterFactory, c, i, s); Runnable write = new Runnable() { public void run() { conn.write(); diff --git a/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java b/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java index 89c03178577ce577170ddce4674ea7ae0b3f97a4..6895b52a48d58342c6ff6f0de81659eb78661b55 100644 --- a/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java +++ b/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java @@ -186,8 +186,8 @@ public class BatchConnectionReadWriteTest extends TestCase { bob.getInstance(ProtocolReaderFactory.class); BatchTransportReader reader = new TestBatchTransportReader(in); IncomingBatchConnection batchIn = new IncomingBatchConnection( - new ImmediateExecutor(), db, connFactory, protoFactory, ctx, - reader, tag); + new ImmediateExecutor(), new ImmediateExecutor(), db, + connFactory, protoFactory, ctx, reader, tag); // No messages should have been added yet assertFalse(listener.messagesAdded); // Read whatever needs to be read