From 3a1cef6933a1221f0ee4a86fc6af8da9f57a585f Mon Sep 17 00:00:00 2001
From: bontric <benjohnwie@gmail.com>
Date: Mon, 13 Aug 2018 18:15:25 +0200
Subject: [PATCH] Add functions/implement

---
 .../bramble/api/db/DatabaseComponent.java     |  6 ++-
 .../bramble/api/sync/SyncSession.java         |  6 ++-
 .../bramble/db/DatabaseComponentImpl.java     | 23 ++++++++++--
 .../bramble/plugin/ConnectionManagerImpl.java |  2 +-
 .../bramble/sync/DuplexOutgoingSession.java   |  2 +-
 .../bramble/sync/IncomingSession.java         |  5 +++
 .../bramble/sync/SimplexOutgoingSession.java  | 37 ++++++++++++++++---
 7 files changed, 69 insertions(+), 12 deletions(-)

diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
index f4161cee44..dc9bc9a0fd 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java
@@ -157,7 +157,7 @@ public interface DatabaseComponent {
 	 * sendable messages that fit in the given length.
 	 */
 	@Nullable
-	Collection<byte[]> generateBatch(Transaction txn, ContactId c,
+	Map<MessageId, byte[]> generateBatch(Transaction txn, ContactId c,
 			int maxLength, int maxLatency) throws DbException;
 
 	/**
@@ -460,6 +460,10 @@ public interface DatabaseComponent {
 	void receiveRequest(Transaction txn, ContactId c, Request r)
 			throws DbException;
 
+	void resetSentTimes(Transaction transaction,
+			Collection<MessageId> messageIds, ContactId c)
+			throws DbException;
+
 	/**
 	 * Removes a contact (and all associated state) from the database.
 	 */
diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java
index 656d5910a3..8cf55408a2 100644
--- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java
+++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java
@@ -16,5 +16,9 @@ public interface SyncSession {
 	 */
 	void interrupt();
 
-	void writerException(IOException e);
+	/**
+	 * Called if the sessions's writer throws an IOException on dispose()
+	 * @param e
+	 */
+	void handleWriterException(IOException e);
 }
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
index 2913c783f4..fdd2da8ad2 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java
@@ -59,6 +59,7 @@ import org.briarproject.bramble.api.transport.TransportKeys;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -307,16 +308,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
 
 	@Nullable
 	@Override
-	public Collection<byte[]> generateBatch(Transaction transaction,
+	public Map<MessageId, byte[]> generateBatch(Transaction transaction,
 			ContactId c, int maxLength, int maxLatency) throws DbException {
 		if (transaction.isReadOnly()) throw new IllegalArgumentException();
 		T txn = unbox(transaction);
 		if (!db.containsContact(txn, c))
 			throw new NoSuchContactException();
 		Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength);
-		List<byte[]> messages = new ArrayList<>(ids.size());
+		Map<MessageId, byte[]> messages = new HashMap<>(ids.size());
 		for (MessageId m : ids) {
-			messages.add(db.getRawMessage(txn, m));
+			messages.put(m, db.getRawMessage(txn, m));
 			db.updateExpiryTime(txn, c, m, maxLatency);
 		}
 		if (ids.isEmpty()) return null;
@@ -740,6 +741,22 @@ class DatabaseComponentImpl<T> implements DatabaseComponent {
 		if (requested) transaction.attach(new MessageRequestedEvent(c));
 	}
 
+	@Override
+	public void resetSentTimes(Transaction transaction,
+			Collection<MessageId> messageIds, ContactId c)
+			throws DbException{
+		if (transaction.isReadOnly()) throw new IllegalArgumentException();
+		T txn = unbox(transaction);
+		if (!db.containsContact(txn, c))
+			throw new NoSuchContactException();
+
+		for (MessageId m : messageIds) {
+			if (db.containsVisibleMessage(txn, c, m)) {
+				db.resetExpiryTime(txn, c, m);
+			}
+		}
+	}
+
 	@Override
 	public void removeContact(Transaction transaction, ContactId c)
 			throws DbException {
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java
index 9062613a45..43c9bc20c8 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java
@@ -220,7 +220,7 @@ class ConnectionManagerImpl implements ConnectionManager {
 				writer.dispose(exception);
 			} catch (IOException e) {
 				logException(LOG, WARNING, e);
-				outgoingSession.writerException(e);
+				outgoingSession.handleWriterException(e);
 			}
 		}
 	}
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
index b092829002..73003db3de 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java
@@ -196,7 +196,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
 	}
 
 	@Override
-	public void writerException(IOException e) {
+	public void handleWriterException(IOException e) {
 		// No handling required for now
 	}
 
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java
index 93ad805e1f..a2154325a6 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java
@@ -97,6 +97,11 @@ class IncomingSession implements SyncSession, EventListener {
 		interrupted = true;
 	}
 
+	@Override
+	public void handleWriterException(IOException e) {
+		// No handling required
+	}
+
 	@Override
 	public void eventOccurred(Event e) {
 		if (e instanceof ContactRemovedEvent) {
diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
index 1c33260c71..11213c6d3d 100644
--- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
+++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java
@@ -13,12 +13,15 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor;
 import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent;
 import org.briarproject.bramble.api.nullsafety.NotNullByDefault;
 import org.briarproject.bramble.api.sync.Ack;
+import org.briarproject.bramble.api.sync.MessageId;
 import org.briarproject.bramble.api.sync.SyncRecordWriter;
 import org.briarproject.bramble.api.sync.SyncSession;
 import org.briarproject.bramble.api.transport.StreamWriter;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,6 +61,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 	private final AtomicInteger outstandingQueries;
 	private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks;
 
+	private volatile Collection<MessageId> sessionMessages;
+
 	private volatile boolean interrupted = false;
 
 	SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor,
@@ -70,6 +75,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 		this.maxLatency = maxLatency;
 		this.streamWriter = streamWriter;
 		this.recordWriter = recordWriter;
+
+		sessionMessages = new LinkedList<>();
 		outstandingQueries = new AtomicInteger(2); // One per type of record
 		writerTasks = new LinkedBlockingQueue<>();
 	}
@@ -106,8 +113,19 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 	}
 
 	@Override
-	public void writerException(IOException e) {
-		// TODO: reset send times for messages
+	public void handleWriterException(IOException e) {
+		try {
+			Transaction txn = db.startTransaction(false);
+			try{
+				  db.resetSentTimes(txn, sessionMessages, contactId);
+				  db.commitTransaction(txn);
+			}finally{
+				db.endTransaction(txn);
+			}
+
+		} catch (DbException e1) {
+			e1.printStackTrace();
+		}
 	}
 
 	private void decrementOutstandingQueries() {
@@ -176,7 +194,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 		public void run() {
 			if (interrupted) return;
 			try {
-				Collection<byte[]> b;
+				Map<MessageId, byte[]> b;
 				Transaction txn = db.startTransaction(false);
 				try {
 					b = db.generateBatch(txn, contactId,
@@ -188,7 +206,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 				if (LOG.isLoggable(INFO))
 					LOG.info("Generated batch: " + (b != null));
 				if (b == null) decrementOutstandingQueries();
-				else writerTasks.add(new WriteBatch(b));
+				else {
+					sessionMessages.addAll(b.keySet());
+					writerTasks.add(new WriteBatch(b.values()));
+				}
 			} catch (DbException e) {
 				logException(LOG, WARNING, e);
 				interrupt();
@@ -208,7 +229,13 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
 		@Override
 		public void run() throws IOException {
 			if (interrupted) return;
-			for (byte[] raw : batch) recordWriter.writeMessage(raw);
+			try {
+				for (byte[] raw : batch)
+					recordWriter.writeMessage(raw);
+			}catch (IOException e) {
+				handleWriterException(e);
+				throw e;
+			}
 			LOG.info("Sent batch");
 			dbExecutor.execute(new GenerateBatch());
 		}
-- 
GitLab