diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java index f4161cee44ef19c37a4604c5ad6c24a585e36f09..dc9bc9a0fdbe82a7c0e084a39d3e763e4c868261 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/db/DatabaseComponent.java @@ -157,7 +157,7 @@ public interface DatabaseComponent { * sendable messages that fit in the given length. */ @Nullable - Collection<byte[]> generateBatch(Transaction txn, ContactId c, + Map<MessageId, byte[]> generateBatch(Transaction txn, ContactId c, int maxLength, int maxLatency) throws DbException; /** @@ -460,6 +460,10 @@ public interface DatabaseComponent { void receiveRequest(Transaction txn, ContactId c, Request r) throws DbException; + void resetSentTimes(Transaction transaction, + Collection<MessageId> messageIds, ContactId c) + throws DbException; + /** * Removes a contact (and all associated state) from the database. */ diff --git a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java index 656d5910a3783790f2e6a3b4d916e9216abcaec2..8cf55408a2389a455d6141be741c67d73d3fd2c1 100644 --- a/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java +++ b/bramble-api/src/main/java/org/briarproject/bramble/api/sync/SyncSession.java @@ -16,5 +16,9 @@ public interface SyncSession { */ void interrupt(); - void writerException(IOException e); + /** + * Called if the sessions's writer throws an IOException on dispose() + * @param e + */ + void handleWriterException(IOException e); } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java index 2913c783f4407344a04ab9e63d8b87bafbd2863f..fdd2da8ad2766eac6d0a4b1932a79ddb02f40114 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/db/DatabaseComponentImpl.java @@ -59,6 +59,7 @@ import org.briarproject.bramble.api.transport.TransportKeys; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -307,16 +308,16 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { @Nullable @Override - public Collection<byte[]> generateBatch(Transaction transaction, + public Map<MessageId, byte[]> generateBatch(Transaction transaction, ContactId c, int maxLength, int maxLatency) throws DbException { if (transaction.isReadOnly()) throw new IllegalArgumentException(); T txn = unbox(transaction); if (!db.containsContact(txn, c)) throw new NoSuchContactException(); Collection<MessageId> ids = db.getMessagesToSend(txn, c, maxLength); - List<byte[]> messages = new ArrayList<>(ids.size()); + Map<MessageId, byte[]> messages = new HashMap<>(ids.size()); for (MessageId m : ids) { - messages.add(db.getRawMessage(txn, m)); + messages.put(m, db.getRawMessage(txn, m)); db.updateExpiryTime(txn, c, m, maxLatency); } if (ids.isEmpty()) return null; @@ -740,6 +741,22 @@ class DatabaseComponentImpl<T> implements DatabaseComponent { if (requested) transaction.attach(new MessageRequestedEvent(c)); } + @Override + public void resetSentTimes(Transaction transaction, + Collection<MessageId> messageIds, ContactId c) + throws DbException{ + if (transaction.isReadOnly()) throw new IllegalArgumentException(); + T txn = unbox(transaction); + if (!db.containsContact(txn, c)) + throw new NoSuchContactException(); + + for (MessageId m : messageIds) { + if (db.containsVisibleMessage(txn, c, m)) { + db.resetExpiryTime(txn, c, m); + } + } + } + @Override public void removeContact(Transaction transaction, ContactId c) throws DbException { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java index 9062613a45b0c4c3388954b2d931be9e401e9738..43c9bc20c879981d14f7fb4d83e572463f40c6fe 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/plugin/ConnectionManagerImpl.java @@ -220,7 +220,7 @@ class ConnectionManagerImpl implements ConnectionManager { writer.dispose(exception); } catch (IOException e) { logException(LOG, WARNING, e); - outgoingSession.writerException(e); + outgoingSession.handleWriterException(e); } } } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java index b09282900279e5fe72e52188753123c7ac134d85..73003db3deb86bce6b66cb6a8c0b1a28e139106b 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/DuplexOutgoingSession.java @@ -196,7 +196,7 @@ class DuplexOutgoingSession implements SyncSession, EventListener { } @Override - public void writerException(IOException e) { + public void handleWriterException(IOException e) { // No handling required for now } diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java index 93ad805e1fdf11f68ea9ac9318dc26399062111c..a2154325a64248a59cbbdb7a298c6a5d237f0cd7 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/IncomingSession.java @@ -97,6 +97,11 @@ class IncomingSession implements SyncSession, EventListener { interrupted = true; } + @Override + public void handleWriterException(IOException e) { + // No handling required + } + @Override public void eventOccurred(Event e) { if (e instanceof ContactRemovedEvent) { diff --git a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java index 1c33260c713526d665944af72e79be64bb96eb48..11213c6d3d8a1eccb52f59d5fe53078ebf2cc384 100644 --- a/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java +++ b/bramble-core/src/main/java/org/briarproject/bramble/sync/SimplexOutgoingSession.java @@ -13,12 +13,15 @@ import org.briarproject.bramble.api.lifecycle.IoExecutor; import org.briarproject.bramble.api.lifecycle.event.LifecycleEvent; import org.briarproject.bramble.api.nullsafety.NotNullByDefault; import org.briarproject.bramble.api.sync.Ack; +import org.briarproject.bramble.api.sync.MessageId; import org.briarproject.bramble.api.sync.SyncRecordWriter; import org.briarproject.bramble.api.sync.SyncSession; import org.briarproject.bramble.api.transport.StreamWriter; import java.io.IOException; import java.util.Collection; +import java.util.LinkedList; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; @@ -58,6 +61,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener { private final AtomicInteger outstandingQueries; private final BlockingQueue<ThrowingRunnable<IOException>> writerTasks; + private volatile Collection<MessageId> sessionMessages; + private volatile boolean interrupted = false; SimplexOutgoingSession(DatabaseComponent db, Executor dbExecutor, @@ -70,6 +75,8 @@ class SimplexOutgoingSession implements SyncSession, EventListener { this.maxLatency = maxLatency; this.streamWriter = streamWriter; this.recordWriter = recordWriter; + + sessionMessages = new LinkedList<>(); outstandingQueries = new AtomicInteger(2); // One per type of record writerTasks = new LinkedBlockingQueue<>(); } @@ -106,8 +113,19 @@ class SimplexOutgoingSession implements SyncSession, EventListener { } @Override - public void writerException(IOException e) { - // TODO: reset send times for messages + public void handleWriterException(IOException e) { + try { + Transaction txn = db.startTransaction(false); + try{ + db.resetSentTimes(txn, sessionMessages, contactId); + db.commitTransaction(txn); + }finally{ + db.endTransaction(txn); + } + + } catch (DbException e1) { + e1.printStackTrace(); + } } private void decrementOutstandingQueries() { @@ -176,7 +194,7 @@ class SimplexOutgoingSession implements SyncSession, EventListener { public void run() { if (interrupted) return; try { - Collection<byte[]> b; + Map<MessageId, byte[]> b; Transaction txn = db.startTransaction(false); try { b = db.generateBatch(txn, contactId, @@ -188,7 +206,10 @@ class SimplexOutgoingSession implements SyncSession, EventListener { if (LOG.isLoggable(INFO)) LOG.info("Generated batch: " + (b != null)); if (b == null) decrementOutstandingQueries(); - else writerTasks.add(new WriteBatch(b)); + else { + sessionMessages.addAll(b.keySet()); + writerTasks.add(new WriteBatch(b.values())); + } } catch (DbException e) { logException(LOG, WARNING, e); interrupt(); @@ -208,7 +229,13 @@ class SimplexOutgoingSession implements SyncSession, EventListener { @Override public void run() throws IOException { if (interrupted) return; - for (byte[] raw : batch) recordWriter.writeMessage(raw); + try { + for (byte[] raw : batch) + recordWriter.writeMessage(raw); + }catch (IOException e) { + handleWriterException(e); + throw e; + } LOG.info("Sent batch"); dbExecutor.execute(new GenerateBatch()); }