From f474ed229db8aa6d61dd6b83e2b2a0bd2f20abc4 Mon Sep 17 00:00:00 2001
From: akwizgran <akwizgran@users.sourceforge.net>
Date: Mon, 5 Dec 2011 23:16:27 +0000
Subject: [PATCH] Limit the number of waiting database writes to avoid running
 out of memory.

---
 .../batch/IncomingBatchConnection.java         | 15 +++++++++++++++
 .../transport/stream/StreamConnection.java     | 18 ++++++++++++++++++
 2 files changed, 33 insertions(+)

diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java
index c8d63980e6..cf8e00d04e 100644
--- a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java
+++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java
@@ -3,6 +3,7 @@ package net.sf.briar.transport.batch;
 import java.io.IOException;
 import java.security.GeneralSecurityException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -23,6 +24,8 @@ import net.sf.briar.api.transport.ConnectionReaderFactory;
 
 class IncomingBatchConnection {
 
+	private static final int MAX_WAITING_DB_WRITES = 5;
+
 	private static final Logger LOG =
 		Logger.getLogger(IncomingBatchConnection.class.getName());
 
@@ -33,6 +36,7 @@ class IncomingBatchConnection {
 	private final ConnectionContext ctx;
 	private final BatchTransportReader reader;
 	private final byte[] tag;
+	private final Semaphore semaphore;
 
 	IncomingBatchConnection(Executor executor,
 			ConnectionReaderFactory connFactory,
@@ -45,6 +49,7 @@ class IncomingBatchConnection {
 		this.ctx = ctx;
 		this.reader = reader;
 		this.tag = tag;
+		semaphore = new Semaphore(MAX_WAITING_DB_WRITES);
 	}
 
 	void read() {
@@ -59,6 +64,7 @@ class IncomingBatchConnection {
 				if(proto.hasAck()) {
 					final Ack a = proto.readAck();
 					// Store the ack on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -67,11 +73,13 @@ class IncomingBatchConnection {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else if(proto.hasBatch()) {
 					final UnverifiedBatch b = proto.readBatch();
 					// Verify and store the batch on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -83,11 +91,13 @@ class IncomingBatchConnection {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else if(proto.hasSubscriptionUpdate()) {
 					final SubscriptionUpdate s = proto.readSubscriptionUpdate();
 					// Store the update on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -96,11 +106,13 @@ class IncomingBatchConnection {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else if(proto.hasTransportUpdate()) {
 					final TransportUpdate t = proto.readTransportUpdate();
 					// Store the update on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -109,12 +121,15 @@ class IncomingBatchConnection {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else {
 					throw new FormatException();
 				}
 			}
+		} catch(InterruptedException e) {
+			Thread.currentThread().interrupt();
 		} catch(IOException e) {
 			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
 			reader.dispose(false);
diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java
index 071868574c..17839b393b 100644
--- a/components/net/sf/briar/transport/stream/StreamConnection.java
+++ b/components/net/sf/briar/transport/stream/StreamConnection.java
@@ -11,6 +11,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Semaphore;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -49,6 +50,8 @@ import net.sf.briar.api.transport.StreamTransportConnection;
 
 abstract class StreamConnection implements DatabaseListener {
 
+	private static final int MAX_WAITING_DB_WRITES = 5;
+
 	private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES };
 
 	private static final Logger LOG =
@@ -63,6 +66,8 @@ abstract class StreamConnection implements DatabaseListener {
 	protected final ContactId contactId;
 	protected final StreamTransportConnection connection;
 
+	private final Semaphore semaphore;
+
 	private int writerFlags = 0; // Locking: this
 	private Collection<MessageId> offered = null; // Locking: this
 	private LinkedList<MessageId> requested = null; // Locking: this
@@ -82,6 +87,7 @@ abstract class StreamConnection implements DatabaseListener {
 		this.protoWriterFactory = protoWriterFactory;
 		this.contactId = contactId;
 		this.connection = connection;
+		semaphore = new Semaphore(MAX_WAITING_DB_WRITES);
 	}
 
 	protected abstract ConnectionReader createConnectionReader()
@@ -126,6 +132,7 @@ abstract class StreamConnection implements DatabaseListener {
 				if(proto.hasAck()) {
 					final Ack a = proto.readAck();
 					// Store the ack on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -134,11 +141,13 @@ abstract class StreamConnection implements DatabaseListener {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else if(proto.hasBatch()) {
 					final UnverifiedBatch b = proto.readBatch();
 					// Verify and store the batch on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -150,6 +159,7 @@ abstract class StreamConnection implements DatabaseListener {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else if(proto.hasOffer()) {
@@ -182,6 +192,7 @@ abstract class StreamConnection implements DatabaseListener {
 					// Mark the unrequested messages as seen on another thread
 					final List<MessageId> l =
 						Collections.unmodifiableList(seen);
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -190,6 +201,7 @@ abstract class StreamConnection implements DatabaseListener {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 					// Store the requested message IDs and notify the writer
@@ -203,6 +215,7 @@ abstract class StreamConnection implements DatabaseListener {
 				} else if(proto.hasSubscriptionUpdate()) {
 					final SubscriptionUpdate s = proto.readSubscriptionUpdate();
 					// Store the update on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -211,11 +224,13 @@ abstract class StreamConnection implements DatabaseListener {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else if(proto.hasTransportUpdate()) {
 					final TransportUpdate t = proto.readTransportUpdate();
 					// Store the update on another thread
+					semaphore.acquire();
 					executor.execute(new Runnable() {
 						public void run() {
 							try {
@@ -224,6 +239,7 @@ abstract class StreamConnection implements DatabaseListener {
 								if(LOG.isLoggable(Level.WARNING))
 									LOG.warning(e.getMessage());
 							}
+							semaphore.release();
 						}
 					});
 				} else {
@@ -233,6 +249,8 @@ abstract class StreamConnection implements DatabaseListener {
 		} catch(DbException e) {
 			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
 			connection.dispose(false);
+		} catch(InterruptedException e) {
+			Thread.currentThread().interrupt();
 		} catch(IOException e) {
 			if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
 			connection.dispose(false);
-- 
GitLab