From e2cb1027aff687c7c256de6fd19c973a33a062a3 Mon Sep 17 00:00:00 2001
From: akwizgran <akwizgran@users.sourceforge.net>
Date: Wed, 7 Dec 2011 21:33:14 +0000
Subject: [PATCH] Moved message verification into a separate thread pool.

---
 .../api/protocol/VerificationExecutor.java    | 15 ++++++
 .../net/sf/briar/db/DatabaseExecutorImpl.java |  9 +++-
 .../net/sf/briar/protocol/ProtocolModule.java |  6 +++
 .../protocol/VerificationExecutorImpl.java    | 47 +++++++++++++++++++
 .../batch/BatchConnectionFactoryImpl.java     | 16 ++++---
 .../batch/IncomingBatchConnection.java        | 35 ++++++++++----
 .../stream/IncomingStreamConnection.java      |  8 ++--
 .../stream/OutgoingStreamConnection.java      |  7 ++-
 .../transport/stream/StreamConnection.java    | 36 ++++++++++----
 .../stream/StreamConnectionFactoryImpl.java   | 13 +++--
 .../batch/BatchConnectionReadWriteTest.java   |  4 +-
 11 files changed, 160 insertions(+), 36 deletions(-)
 create mode 100644 api/net/sf/briar/api/protocol/VerificationExecutor.java
 create mode 100644 components/net/sf/briar/protocol/VerificationExecutorImpl.java

diff --git a/api/net/sf/briar/api/protocol/VerificationExecutor.java b/api/net/sf/briar/api/protocol/VerificationExecutor.java
new file mode 100644
index 0000000000..90a20d7c53
--- /dev/null
+++ b/api/net/sf/briar/api/protocol/VerificationExecutor.java
@@ -0,0 +1,15 @@
+package net.sf.briar.api.protocol;
+
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+import com.google.inject.BindingAnnotation;
+
+/** Annotation for injecting the executor for message verification tasks. */
+@BindingAnnotation
+@Target({ PARAMETER })
+@Retention(RUNTIME)
+public @interface VerificationExecutor {}
diff --git a/components/net/sf/briar/db/DatabaseExecutorImpl.java b/components/net/sf/briar/db/DatabaseExecutorImpl.java
index 20cbf810a4..8692b3e927 100644
--- a/components/net/sf/briar/db/DatabaseExecutorImpl.java
+++ b/components/net/sf/briar/db/DatabaseExecutorImpl.java
@@ -6,6 +6,10 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * An executor that limits the number of concurrent database tasks and the
+ * number of tasks queued for execution.
+ */
 class DatabaseExecutorImpl implements Executor {
 
 	// FIXME: Determine suitable values for these constants empirically
@@ -28,14 +32,15 @@ class DatabaseExecutorImpl implements Executor {
 		this(MAX_QUEUED_TASKS, MIN_THREADS, MAX_THREADS);
 	}
 
-	DatabaseExecutorImpl(int maxQueuedTasks, int minThreads, int maxThreads) {
-		queue = new ArrayBlockingQueue<Runnable>(maxQueuedTasks);
+	DatabaseExecutorImpl(int maxQueued, int minThreads, int maxThreads) {
+		queue = new ArrayBlockingQueue<Runnable>(maxQueued);
 		new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS,
 				queue);
 	}
 
 	public void execute(Runnable r) {
 		try {
+			// Block until there's space in the queue
 			queue.put(r);
 		} catch(InterruptedException e) {
 			Thread.currentThread().interrupt();
diff --git a/components/net/sf/briar/protocol/ProtocolModule.java b/components/net/sf/briar/protocol/ProtocolModule.java
index cf83575c93..d3eda315b5 100644
--- a/components/net/sf/briar/protocol/ProtocolModule.java
+++ b/components/net/sf/briar/protocol/ProtocolModule.java
@@ -1,5 +1,7 @@
 package net.sf.briar.protocol;
 
+import java.util.concurrent.Executor;
+
 import net.sf.briar.api.crypto.CryptoComponent;
 import net.sf.briar.api.protocol.Ack;
 import net.sf.briar.api.protocol.Author;
@@ -15,10 +17,12 @@ import net.sf.briar.api.protocol.Request;
 import net.sf.briar.api.protocol.SubscriptionUpdate;
 import net.sf.briar.api.protocol.TransportUpdate;
 import net.sf.briar.api.protocol.UnverifiedBatch;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.serial.ObjectReader;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
+import com.google.inject.Singleton;
 
 public class ProtocolModule extends AbstractModule {
 
@@ -31,6 +35,8 @@ public class ProtocolModule extends AbstractModule {
 		bind(ProtocolReaderFactory.class).to(ProtocolReaderFactoryImpl.class);
 		bind(ProtocolWriterFactory.class).to(ProtocolWriterFactoryImpl.class);
 		bind(UnverifiedBatchFactory.class).to(UnverifiedBatchFactoryImpl.class);
+		bind(Executor.class).annotatedWith(VerificationExecutor.class).to(
+				VerificationExecutorImpl.class).in(Singleton.class);
 	}
 
 	@Provides
diff --git a/components/net/sf/briar/protocol/VerificationExecutorImpl.java b/components/net/sf/briar/protocol/VerificationExecutorImpl.java
new file mode 100644
index 0000000000..203ae33ffd
--- /dev/null
+++ b/components/net/sf/briar/protocol/VerificationExecutorImpl.java
@@ -0,0 +1,47 @@
+package net.sf.briar.protocol;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An executor that limits the number of concurrent message verification tasks
+ * and the number of tasks queued for execution.
+ */
+class VerificationExecutorImpl implements Executor {
+
+	// FIXME: Determine suitable values for these constants empirically
+
+	/**
+	 * The maximum number of tasks that can be queued for execution
+	 * before attempting to execute another task will block.
+	 */
+	private static final int MAX_QUEUED_TASKS = 10;
+
+	/** The number of idle threads to keep in the pool. */
+	private static final int MIN_THREADS = 1;
+
+	private final BlockingQueue<Runnable> queue;
+
+	VerificationExecutorImpl() {
+		this(MAX_QUEUED_TASKS, MIN_THREADS,
+				Runtime.getRuntime().availableProcessors());
+	}
+
+	VerificationExecutorImpl(int maxQueued, int minThreads, int maxThreads) {
+		queue = new ArrayBlockingQueue<Runnable>(maxQueued);
+		new ThreadPoolExecutor(minThreads, maxThreads, 60, TimeUnit.SECONDS,
+				queue);
+	}
+
+	public void execute(Runnable r) {
+		try {
+			// Block until there's space in the queue
+			queue.put(r);
+		} catch(InterruptedException e) {
+			Thread.currentThread().interrupt();
+		}
+	}
+}
diff --git a/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java b/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java
index 96d1d35e16..3afe1a4fa3 100644
--- a/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java
+++ b/components/net/sf/briar/transport/batch/BatchConnectionFactoryImpl.java
@@ -4,9 +4,11 @@ import java.util.concurrent.Executor;
 
 import net.sf.briar.api.ContactId;
 import net.sf.briar.api.db.DatabaseComponent;
+import net.sf.briar.api.db.DatabaseExecutor;
 import net.sf.briar.api.protocol.ProtocolReaderFactory;
 import net.sf.briar.api.protocol.ProtocolWriterFactory;
 import net.sf.briar.api.protocol.TransportIndex;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.transport.BatchConnectionFactory;
 import net.sf.briar.api.transport.BatchTransportReader;
 import net.sf.briar.api.transport.BatchTransportWriter;
@@ -18,7 +20,7 @@ import com.google.inject.Inject;
 
 class BatchConnectionFactoryImpl implements BatchConnectionFactory {
 
-	private final Executor executor;
+	private final Executor dbExecutor, verificationExecutor;
 	private final DatabaseComponent db;
 	private final ConnectionReaderFactory connReaderFactory;
 	private final ConnectionWriterFactory connWriterFactory;
@@ -26,12 +28,14 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
 	private final ProtocolWriterFactory protoWriterFactory;
 
 	@Inject
-	BatchConnectionFactoryImpl(Executor executor, DatabaseComponent db,
-			ConnectionReaderFactory connReaderFactory,
+	BatchConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
+			@VerificationExecutor Executor verificationExecutor,
+			DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
 			ConnectionWriterFactory connWriterFactory,
 			ProtocolReaderFactory protoReaderFactory,
 			ProtocolWriterFactory protoWriterFactory) {
-		this.executor = executor;
+		this.dbExecutor = dbExecutor;
+		this.verificationExecutor = verificationExecutor;
 		this.db = db;
 		this.connReaderFactory = connReaderFactory;
 		this.connWriterFactory = connWriterFactory;
@@ -42,8 +46,8 @@ class BatchConnectionFactoryImpl implements BatchConnectionFactory {
 	public void createIncomingConnection(ConnectionContext ctx,
 			BatchTransportReader r, byte[] tag) {
 		final IncomingBatchConnection conn = new IncomingBatchConnection(
-				executor, db, connReaderFactory, protoReaderFactory, ctx, r,
-				tag);
+				dbExecutor, verificationExecutor, db, connReaderFactory,
+				protoReaderFactory, ctx, r, tag);
 		Runnable read = new Runnable() {
 			public void run() {
 				conn.read();
diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java
index 2d4c0d810a..074245db4a 100644
--- a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java
+++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java
@@ -13,11 +13,13 @@ import net.sf.briar.api.db.DatabaseComponent;
 import net.sf.briar.api.db.DatabaseExecutor;
 import net.sf.briar.api.db.DbException;
 import net.sf.briar.api.protocol.Ack;
+import net.sf.briar.api.protocol.Batch;
 import net.sf.briar.api.protocol.ProtocolReader;
 import net.sf.briar.api.protocol.ProtocolReaderFactory;
 import net.sf.briar.api.protocol.SubscriptionUpdate;
 import net.sf.briar.api.protocol.TransportUpdate;
 import net.sf.briar.api.protocol.UnverifiedBatch;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.transport.BatchTransportReader;
 import net.sf.briar.api.transport.ConnectionContext;
 import net.sf.briar.api.transport.ConnectionReader;
@@ -28,7 +30,7 @@ class IncomingBatchConnection {
 	private static final Logger LOG =
 		Logger.getLogger(IncomingBatchConnection.class.getName());
 
-	private final Executor dbExecutor;
+	private final Executor dbExecutor, verificationExecutor;
 	private final ConnectionReaderFactory connFactory;
 	private final DatabaseComponent db;
 	private final ProtocolReaderFactory protoFactory;
@@ -38,10 +40,12 @@ class IncomingBatchConnection {
 	private final ContactId contactId;
 
 	IncomingBatchConnection(@DatabaseExecutor Executor dbExecutor,
+			@VerificationExecutor Executor verificationExecutor,
 			DatabaseComponent db, ConnectionReaderFactory connFactory,
 			ProtocolReaderFactory protoFactory, ConnectionContext ctx,
 			BatchTransportReader transport, byte[] tag) {
 		this.dbExecutor = dbExecutor;
+		this.verificationExecutor = verificationExecutor;
 		this.connFactory = connFactory;
 		this.db = db;
 		this.protoFactory = protoFactory;
@@ -64,7 +68,7 @@ class IncomingBatchConnection {
 					dbExecutor.execute(new ReceiveAck(a));
 				} else if(reader.hasBatch()) {
 					UnverifiedBatch b = reader.readBatch();
-					dbExecutor.execute(new ReceiveBatch(b));
+					verificationExecutor.execute(new VerifyBatch(b));
 				} else if(reader.hasSubscriptionUpdate()) {
 					SubscriptionUpdate s = reader.readSubscriptionUpdate();
 					dbExecutor.execute(new ReceiveSubscriptionUpdate(s));
@@ -99,26 +103,41 @@ class IncomingBatchConnection {
 		}
 	}
 
-	private class ReceiveBatch implements Runnable {
+	private class VerifyBatch implements Runnable {
 
 		private final UnverifiedBatch batch;
 
-		private ReceiveBatch(UnverifiedBatch batch) {
+		private VerifyBatch(UnverifiedBatch batch) {
 			this.batch = batch;
 		}
 
 		public void run() {
 			try {
-				// FIXME: Don't verify on the DB thread
-				db.receiveBatch(contactId, batch.verify());
-			} catch(DbException e) {
-				if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+				Batch b = batch.verify();
+				dbExecutor.execute(new ReceiveBatch(b));
 			} catch(GeneralSecurityException e) {
 				if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
 			}
 		}
 	}
 
+	private class ReceiveBatch implements Runnable {
+
+		private final Batch batch;
+
+		private ReceiveBatch(Batch batch) {
+			this.batch = batch;
+		}
+
+		public void run() {
+			try {
+				db.receiveBatch(contactId, batch);
+			} catch(DbException e) {
+				if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+			}
+		}
+	}
+
 	private class ReceiveSubscriptionUpdate implements Runnable {
 
 		private final SubscriptionUpdate update;
diff --git a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java
index cfdf931ea0..6fcf17e6a2 100644
--- a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java
+++ b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java
@@ -7,6 +7,7 @@ import net.sf.briar.api.db.DatabaseComponent;
 import net.sf.briar.api.db.DatabaseExecutor;
 import net.sf.briar.api.protocol.ProtocolReaderFactory;
 import net.sf.briar.api.protocol.ProtocolWriterFactory;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.transport.ConnectionContext;
 import net.sf.briar.api.transport.ConnectionReader;
 import net.sf.briar.api.transport.ConnectionReaderFactory;
@@ -20,15 +21,16 @@ class IncomingStreamConnection extends StreamConnection {
 	private final byte[] tag;
 
 	IncomingStreamConnection(@DatabaseExecutor Executor dbExecutor,
+			@VerificationExecutor Executor verificationExecutor,
 			DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
 			ConnectionWriterFactory connWriterFactory,
 			ProtocolReaderFactory protoReaderFactory,
 			ProtocolWriterFactory protoWriterFactory,
 			ConnectionContext ctx, StreamTransportConnection connection,
 			byte[] tag) {
-		super(dbExecutor, db, connReaderFactory, connWriterFactory,
-				protoReaderFactory, protoWriterFactory, ctx.getContactId(),
-				connection);
+		super(dbExecutor, verificationExecutor, db, connReaderFactory,
+				connWriterFactory, protoReaderFactory, protoWriterFactory,
+				ctx.getContactId(), connection);
 		this.ctx = ctx;
 		this.tag = tag;
 	}
diff --git a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java
index a24bce3400..d7c41336da 100644
--- a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java
+++ b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java
@@ -10,6 +10,7 @@ import net.sf.briar.api.db.DbException;
 import net.sf.briar.api.protocol.ProtocolReaderFactory;
 import net.sf.briar.api.protocol.ProtocolWriterFactory;
 import net.sf.briar.api.protocol.TransportIndex;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.transport.ConnectionContext;
 import net.sf.briar.api.transport.ConnectionReader;
 import net.sf.briar.api.transport.ConnectionReaderFactory;
@@ -24,14 +25,16 @@ class OutgoingStreamConnection extends StreamConnection {
 	private ConnectionContext ctx = null; // Locking: this
 
 	OutgoingStreamConnection(@DatabaseExecutor Executor dbExecutor,
+			@VerificationExecutor Executor verificationExecutor,
 			DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
 			ConnectionWriterFactory connWriterFactory,
 			ProtocolReaderFactory protoReaderFactory,
 			ProtocolWriterFactory protoWriterFactory, ContactId contactId,
 			TransportIndex transportIndex,
 			StreamTransportConnection connection) {
-		super(dbExecutor, db, connReaderFactory, connWriterFactory,
-				protoReaderFactory, protoWriterFactory, contactId, connection);
+		super(dbExecutor, verificationExecutor, db, connReaderFactory,
+				connWriterFactory, protoReaderFactory, protoWriterFactory,
+				contactId, connection);
 		this.transportIndex = transportIndex;
 	}
 
diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java
index 7544d729a6..44f16211eb 100644
--- a/components/net/sf/briar/transport/stream/StreamConnection.java
+++ b/components/net/sf/briar/transport/stream/StreamConnection.java
@@ -28,6 +28,7 @@ import net.sf.briar.api.db.event.LocalTransportsUpdatedEvent;
 import net.sf.briar.api.db.event.MessagesAddedEvent;
 import net.sf.briar.api.db.event.SubscriptionsUpdatedEvent;
 import net.sf.briar.api.protocol.Ack;
+import net.sf.briar.api.protocol.Batch;
 import net.sf.briar.api.protocol.MessageId;
 import net.sf.briar.api.protocol.Offer;
 import net.sf.briar.api.protocol.ProtocolReader;
@@ -39,6 +40,7 @@ import net.sf.briar.api.protocol.Request;
 import net.sf.briar.api.protocol.SubscriptionUpdate;
 import net.sf.briar.api.protocol.TransportUpdate;
 import net.sf.briar.api.protocol.UnverifiedBatch;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.transport.ConnectionReader;
 import net.sf.briar.api.transport.ConnectionReaderFactory;
 import net.sf.briar.api.transport.ConnectionWriter;
@@ -50,7 +52,6 @@ abstract class StreamConnection implements DatabaseListener {
 	private static final Logger LOG =
 		Logger.getLogger(StreamConnection.class.getName());
 
-	protected final Executor dbExecutor;
 	protected final DatabaseComponent db;
 	protected final ConnectionReaderFactory connReaderFactory;
 	protected final ConnectionWriterFactory connWriterFactory;
@@ -59,6 +60,7 @@ abstract class StreamConnection implements DatabaseListener {
 	protected final ContactId contactId;
 	protected final StreamTransportConnection transport;
 
+	private final Executor dbExecutor, verificationExecutor;
 	private final AtomicBoolean canSendOffer;
 	private final LinkedList<Runnable> writerTasks; // Locking: this
 
@@ -68,12 +70,14 @@ abstract class StreamConnection implements DatabaseListener {
 	private volatile boolean closed = false;
 
 	StreamConnection(@DatabaseExecutor Executor dbExecutor,
+			@VerificationExecutor Executor verificationExecutor,
 			DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
 			ConnectionWriterFactory connWriterFactory,
 			ProtocolReaderFactory protoReaderFactory,
 			ProtocolWriterFactory protoWriterFactory, ContactId contactId,
 			StreamTransportConnection transport) {
 		this.dbExecutor = dbExecutor;
+		this.verificationExecutor = verificationExecutor;
 		this.db = db;
 		this.connReaderFactory = connReaderFactory;
 		this.connWriterFactory = connWriterFactory;
@@ -121,7 +125,7 @@ abstract class StreamConnection implements DatabaseListener {
 					dbExecutor.execute(new ReceiveAck(a));
 				} else if(reader.hasBatch()) {
 					UnverifiedBatch b = reader.readBatch();
-					dbExecutor.execute(new ReceiveBatch(b));
+					verificationExecutor.execute(new VerifyBatch(b));
 				} else if(reader.hasOffer()) {
 					Offer o = reader.readOffer();
 					dbExecutor.execute(new ReceiveOffer(o));
@@ -232,23 +236,39 @@ abstract class StreamConnection implements DatabaseListener {
 		}
 	}
 
+	// This task runs on a verification thread
+	private class VerifyBatch implements Runnable {
+
+		private final UnverifiedBatch batch;
+
+		private VerifyBatch(UnverifiedBatch batch) {
+			this.batch = batch;
+		}
+
+		public void run() {
+			try {
+				Batch b = batch.verify();
+				dbExecutor.execute(new ReceiveBatch(b));
+			} catch(GeneralSecurityException e) {
+				if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
+			}
+		}
+	}
+
 	// This task runs on a database thread
 	private class ReceiveBatch implements Runnable {
 
-		private final UnverifiedBatch batch;
+		private final Batch batch;
 
-		private ReceiveBatch(UnverifiedBatch batch) {
+		private ReceiveBatch(Batch batch) {
 			this.batch = batch;
 		}
 
 		public void run() {
 			try {
-				// FIXME: Don't verify on the DB thread
-				db.receiveBatch(contactId, batch.verify());
+				db.receiveBatch(contactId, batch);
 			} catch(DbException e) {
 				if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
-			} catch(GeneralSecurityException e) {
-				if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
 			}
 		}
 	}
diff --git a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java
index 67787c8207..b6c100996b 100644
--- a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java
+++ b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java
@@ -8,6 +8,7 @@ import net.sf.briar.api.db.DatabaseExecutor;
 import net.sf.briar.api.protocol.ProtocolReaderFactory;
 import net.sf.briar.api.protocol.ProtocolWriterFactory;
 import net.sf.briar.api.protocol.TransportIndex;
+import net.sf.briar.api.protocol.VerificationExecutor;
 import net.sf.briar.api.transport.ConnectionContext;
 import net.sf.briar.api.transport.ConnectionReaderFactory;
 import net.sf.briar.api.transport.ConnectionWriterFactory;
@@ -18,7 +19,7 @@ import com.google.inject.Inject;
 
 class StreamConnectionFactoryImpl implements StreamConnectionFactory {
 
-	private final Executor dbExecutor;
+	private final Executor dbExecutor, verificationExecutor;
 	private final DatabaseComponent db;
 	private final ConnectionReaderFactory connReaderFactory;
 	private final ConnectionWriterFactory connWriterFactory;
@@ -27,11 +28,13 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
 
 	@Inject
 	StreamConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor,
+			@VerificationExecutor Executor verificationExecutor,
 			DatabaseComponent db, ConnectionReaderFactory connReaderFactory,
 			ConnectionWriterFactory connWriterFactory,
 			ProtocolReaderFactory protoReaderFactory,
 			ProtocolWriterFactory protoWriterFactory) {
 		this.dbExecutor = dbExecutor;
+		this.verificationExecutor = verificationExecutor;
 		this.db = db;
 		this.connReaderFactory = connReaderFactory;
 		this.connWriterFactory = connWriterFactory;
@@ -42,8 +45,8 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
 	public void createIncomingConnection(ConnectionContext ctx,
 			StreamTransportConnection s, byte[] tag) {
 		final StreamConnection conn = new IncomingStreamConnection(dbExecutor,
-				db, connReaderFactory, connWriterFactory, protoReaderFactory,
-				protoWriterFactory, ctx, s, tag);
+				verificationExecutor, db, connReaderFactory, connWriterFactory,
+				protoReaderFactory, protoWriterFactory, ctx, s, tag);
 		Runnable write = new Runnable() {
 			public void run() {
 				conn.write();
@@ -61,8 +64,8 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory {
 	public void createOutgoingConnection(ContactId c, TransportIndex i,
 			StreamTransportConnection s) {
 		final StreamConnection conn = new OutgoingStreamConnection(dbExecutor,
-				db, connReaderFactory, connWriterFactory, protoReaderFactory,
-				protoWriterFactory, c, i, s);
+				verificationExecutor, db, connReaderFactory, connWriterFactory,
+				protoReaderFactory, protoWriterFactory, c, i, s);
 		Runnable write = new Runnable() {
 			public void run() {
 				conn.write();
diff --git a/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java b/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java
index 89c0317857..6895b52a48 100644
--- a/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java
+++ b/test/net/sf/briar/transport/batch/BatchConnectionReadWriteTest.java
@@ -186,8 +186,8 @@ public class BatchConnectionReadWriteTest extends TestCase {
 			bob.getInstance(ProtocolReaderFactory.class);
 		BatchTransportReader reader = new TestBatchTransportReader(in);
 		IncomingBatchConnection batchIn = new IncomingBatchConnection(
-				new ImmediateExecutor(), db, connFactory, protoFactory, ctx,
-				reader, tag);
+				new ImmediateExecutor(), new ImmediateExecutor(), db,
+				connFactory, protoFactory, ctx, reader, tag);
 		// No messages should have been added yet
 		assertFalse(listener.messagesAdded);
 		// Read whatever needs to be read
-- 
GitLab