From f1e12c630f561a5e57003198ecb6e4f1e68062ba Mon Sep 17 00:00:00 2001 From: akwizgran <michael@briarproject.org> Date: Wed, 10 Apr 2013 00:09:09 +0100 Subject: [PATCH] Several bug fixes for DuplexConnection, logging for ConnectionRegistry. Some packet types weren't being generated or handled, connections weren't properly disposed of when exceptions occurred. --- .../messaging/duplex/DuplexConnection.java | 179 ++++++++++++++++-- .../simplex/OutgoingSimplexConnection.java | 3 +- .../transport/ConnectionRegistryImpl.java | 21 +- 3 files changed, 186 insertions(+), 17 deletions(-) diff --git a/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java b/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java index 9e396f1103..575244faba 100644 --- a/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java +++ b/briar-core/src/net/sf/briar/messaging/duplex/DuplexConnection.java @@ -74,6 +74,10 @@ abstract class DuplexConnection implements DatabaseListener { public void run() {} }; + private static final Runnable DIE = new Runnable() { + public void run() {} + }; + protected final DatabaseComponent db; protected final ConnectionRegistry connRegistry; protected final ConnectionReaderFactory connReaderFactory; @@ -117,7 +121,7 @@ abstract class DuplexConnection implements DatabaseListener { contactId = ctx.getContactId(); transportId = ctx.getTransportId(); maxLatency = transport.getMaxLatency(); - canSendOffer = new AtomicBoolean(false); + canSendOffer = new AtomicBoolean(true); disposed = new AtomicBoolean(false); writerTasks = new LinkedBlockingQueue<Runnable>(); } @@ -170,18 +174,23 @@ abstract class DuplexConnection implements DatabaseListener { try { InputStream in = createConnectionReader().getInputStream(); PacketReader reader = packetReaderFactory.createPacketReader(in); + if(LOG.isLoggable(INFO)) LOG.info("Starting to read"); while(!reader.eof()) { if(reader.hasAck()) { Ack a = reader.readAck(); + if(LOG.isLoggable(INFO)) LOG.info("Received ack"); dbExecutor.execute(new ReceiveAck(a)); } else if(reader.hasMessage()) { UnverifiedMessage m = reader.readMessage(); + if(LOG.isLoggable(INFO)) LOG.info("Received message"); cryptoExecutor.execute(new VerifyMessage(m)); } else if(reader.hasOffer()) { Offer o = reader.readOffer(); + if(LOG.isLoggable(INFO)) LOG.info("Received offer"); dbExecutor.execute(new ReceiveOffer(o)); } else if(reader.hasRequest()) { Request r = reader.readRequest(); + if(LOG.isLoggable(INFO)) LOG.info("Received request"); // Retrieve the offered message IDs Collection<MessageId> offered = getOfferedMessageIds(); if(offered == null) throw new FormatException(); @@ -200,21 +209,44 @@ abstract class DuplexConnection implements DatabaseListener { dbExecutor.execute(new SetSeen(seen)); // Start sending the requested messages dbExecutor.execute(new GenerateBatches(requested)); + } else if(reader.hasRetentionAck()) { + RetentionAck a = reader.readRetentionAck(); + if(LOG.isLoggable(INFO)) LOG.info("Received retention ack"); + dbExecutor.execute(new ReceiveRetentionAck(a)); + } else if(reader.hasRetentionUpdate()) { + RetentionUpdate u = reader.readRetentionUpdate(); + if(LOG.isLoggable(INFO)) + LOG.info("Received retention update"); + dbExecutor.execute(new ReceiveRetentionUpdate(u)); + } else if(reader.hasSubscriptionAck()) { + SubscriptionAck a = reader.readSubscriptionAck(); + if(LOG.isLoggable(INFO)) + LOG.info("Received subscription ack"); + dbExecutor.execute(new ReceiveSubscriptionAck(a)); } else if(reader.hasSubscriptionUpdate()) { SubscriptionUpdate u = reader.readSubscriptionUpdate(); + if(LOG.isLoggable(INFO)) + LOG.info("Received subscription update"); dbExecutor.execute(new ReceiveSubscriptionUpdate(u)); + } else if(reader.hasTransportAck()) { + TransportAck a = reader.readTransportAck(); + if(LOG.isLoggable(INFO)) + LOG.info("Received transport ack"); + dbExecutor.execute(new ReceiveTransportAck(a)); } else if(reader.hasTransportUpdate()) { TransportUpdate u = reader.readTransportUpdate(); + if(LOG.isLoggable(INFO)) + LOG.info("Received transport update"); dbExecutor.execute(new ReceiveTransportUpdate(u)); } else { throw new FormatException(); } } - // The writer will dispose of the transport if no exceptions occur + if(LOG.isLoggable(INFO)) LOG.info("Finished reading"); writerTasks.add(CLOSE); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); - dispose(true, true); + writerTasks.add(DIE); } } @@ -236,6 +268,7 @@ abstract class DuplexConnection implements DatabaseListener { OutputStream out = createConnectionWriter().getOutputStream(); writer = packetWriterFactory.createPacketWriter(out, transport.shouldFlush()); + if(LOG.isLoggable(INFO)) LOG.info("Starting to write"); // Send the initial packets: updates, acks, offer dbExecutor.execute(new GenerateTransportAcks()); dbExecutor.execute(new GenerateTransportUpdates()); @@ -247,14 +280,22 @@ abstract class DuplexConnection implements DatabaseListener { if(canSendOffer.getAndSet(false)) dbExecutor.execute(new GenerateOffer()); // Main loop + Runnable task = null; while(true) { - Runnable task = writerTasks.take(); - if(task == CLOSE) break; + if(LOG.isLoggable(INFO)) + LOG.info("Waiting for something to write"); + task = writerTasks.take(); + if(task == CLOSE || task == DIE) break; task.run(); } - writer.flush(); - writer.close(); - dispose(false, true); + if(LOG.isLoggable(INFO)) LOG.info("Finished writing"); + if(task == CLOSE) { + writer.flush(); + writer.close(); + dispose(false, true); + } else { + dispose(true, true); + } } catch(InterruptedException e) { if(LOG.isLoggable(INFO)) LOG.info("Interrupted while waiting for task"); @@ -262,14 +303,15 @@ abstract class DuplexConnection implements DatabaseListener { } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); - } finally { - connRegistry.unregisterConnection(contactId, transportId); - db.removeListener(this); } + db.removeListener(this); + connRegistry.unregisterConnection(contactId, transportId); } private void dispose(boolean exception, boolean recognised) { if(disposed.getAndSet(true)) return; + if(LOG.isLoggable(INFO)) + LOG.info("Disposing: " + exception + ", " + recognised); ByteUtils.erase(ctx.getSecret()); try { transport.dispose(exception, recognised); @@ -290,6 +332,7 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { db.receiveAck(contactId, ack); + if(LOG.isLoggable(INFO)) LOG.info("DB received ack"); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -308,6 +351,7 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { Message m = messageVerifier.verifyMessage(message); + if(LOG.isLoggable(INFO)) LOG.info("Verified message"); dbExecutor.execute(new ReceiveMessage(m)); } catch(GeneralSecurityException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -327,6 +371,7 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { db.receiveMessage(contactId, message); + if(LOG.isLoggable(INFO)) LOG.info("DB received message"); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -345,6 +390,7 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { Request r = db.receiveOffer(contactId, offer); + if(LOG.isLoggable(INFO)) LOG.info("DB received offer"); writerTasks.add(new WriteRequest(r)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -365,6 +411,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeRequest(request); + if(LOG.isLoggable(INFO)) LOG.info("Sent request"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -384,6 +431,66 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { db.setSeen(contactId, seen); + if(LOG.isLoggable(INFO)) LOG.info("DB set seen"); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This task runs on a database thread + private class ReceiveRetentionAck implements Runnable { + + private final RetentionAck ack; + + private ReceiveRetentionAck(RetentionAck ack) { + this.ack = ack; + } + + public void run() { + try { + db.receiveRetentionAck(contactId, ack); + if(LOG.isLoggable(INFO)) LOG.info("DB received retention ack"); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This task runs on a database thread + private class ReceiveRetentionUpdate implements Runnable { + + private final RetentionUpdate update; + + private ReceiveRetentionUpdate(RetentionUpdate update) { + this.update = update; + } + + public void run() { + try { + db.receiveRetentionUpdate(contactId, update); + if(LOG.isLoggable(INFO)) + LOG.info("DB received retention update"); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This task runs on a database thread + private class ReceiveSubscriptionAck implements Runnable { + + private final SubscriptionAck ack; + + private ReceiveSubscriptionAck(SubscriptionAck ack) { + this.ack = ack; + } + + public void run() { + try { + db.receiveSubscriptionAck(contactId, ack); + if(LOG.isLoggable(INFO)) + LOG.info("DB received subscription ack"); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -402,6 +509,27 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { db.receiveSubscriptionUpdate(contactId, update); + if(LOG.isLoggable(INFO)) + LOG.info("DB received subscription update"); + } catch(DbException e) { + if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); + } + } + } + + // This task runs on a database thread + private class ReceiveTransportAck implements Runnable { + + private final TransportAck ack; + + private ReceiveTransportAck(TransportAck ack) { + this.ack = ack; + } + + public void run() { + try { + db.receiveTransportAck(contactId, ack); + if(LOG.isLoggable(INFO)) LOG.info("DB received transport ack"); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -420,6 +548,8 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { db.receiveTransportUpdate(contactId, update); + if(LOG.isLoggable(INFO)) + LOG.info("DB received transport update"); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); } @@ -434,6 +564,8 @@ abstract class DuplexConnection implements DatabaseListener { int maxMessages = writer.getMaxMessagesForAck(Long.MAX_VALUE); try { Ack a = db.generateAck(contactId, maxMessages); + if(LOG.isLoggable(INFO)) + LOG.info("Generated ack: " + (a != null)); if(a != null) writerTasks.add(new WriteAck(a)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -454,6 +586,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeAck(ack); + if(LOG.isLoggable(INFO)) LOG.info("Sent ack"); dbExecutor.execute(new GenerateAcks()); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -476,6 +609,8 @@ abstract class DuplexConnection implements DatabaseListener { try { Collection<byte[]> batch = db.generateBatch(contactId, MAX_PACKET_LENGTH, maxLatency, requested); + if(LOG.isLoggable(INFO)) + LOG.info("Generated batch: " + (batch != null)); if(batch == null) new GenerateOffer().run(); else writerTasks.add(new WriteBatch(batch, requested)); } catch(DbException e) { @@ -500,6 +635,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { for(byte[] raw : batch) writer.writeMessage(raw); + if(LOG.isLoggable(INFO)) LOG.info("Sent batch"); if(requested.isEmpty()) dbExecutor.execute(new GenerateOffer()); else dbExecutor.execute(new GenerateBatches(requested)); } catch(IOException e) { @@ -517,6 +653,8 @@ abstract class DuplexConnection implements DatabaseListener { int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE); try { Offer o = db.generateOffer(contactId, maxMessages); + if(LOG.isLoggable(INFO)) + LOG.info("Generated offer: " + (o != null)); if(o == null) { // No messages to offer - wait for some to be added canSendOffer.set(true); @@ -545,6 +683,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeOffer(offer); + if(LOG.isLoggable(INFO)) LOG.info("Sent offer"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -558,6 +697,8 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { RetentionAck a = db.generateRetentionAck(contactId); + if(LOG.isLoggable(INFO)) + LOG.info("Generated retention ack: " + (a != null)); if(a != null) writerTasks.add(new WriteRetentionAck(a)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -578,6 +719,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeRetentionAck(ack); + if(LOG.isLoggable(INFO)) LOG.info("Sent retention ack"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -592,6 +734,8 @@ abstract class DuplexConnection implements DatabaseListener { try { RetentionUpdate u = db.generateRetentionUpdate(contactId, maxLatency); + if(LOG.isLoggable(INFO)) + LOG.info("Generated retention update: " + (u != null)); if(u != null) writerTasks.add(new WriteRetentionUpdate(u)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -612,6 +756,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeRetentionUpdate(update); + if(LOG.isLoggable(INFO)) LOG.info("Sent retention update"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -625,6 +770,8 @@ abstract class DuplexConnection implements DatabaseListener { public void run() { try { SubscriptionAck a = db.generateSubscriptionAck(contactId); + if(LOG.isLoggable(INFO)) + LOG.info("Generated subscription ack: " + (a != null)); if(a != null) writerTasks.add(new WriteSubscriptionAck(a)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -645,6 +792,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeSubscriptionAck(ack); + if(LOG.isLoggable(INFO)) LOG.info("Sent subscription ack"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -659,6 +807,8 @@ abstract class DuplexConnection implements DatabaseListener { try { SubscriptionUpdate u = db.generateSubscriptionUpdate(contactId, maxLatency); + if(LOG.isLoggable(INFO)) + LOG.info("Generated subscription update: " + (u != null)); if(u != null) writerTasks.add(new WriteSubscriptionUpdate(u)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -679,6 +829,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { writer.writeSubscriptionUpdate(update); + if(LOG.isLoggable(INFO)) LOG.info("Sent subscription update"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -693,6 +844,8 @@ abstract class DuplexConnection implements DatabaseListener { try { Collection<TransportAck> acks = db.generateTransportAcks(contactId); + if(LOG.isLoggable(INFO)) + LOG.info("Generated transport acks: " + (acks != null)); if(acks != null) writerTasks.add(new WriteTransportAcks(acks)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -713,6 +866,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { for(TransportAck a : acks) writer.writeTransportAck(a); + if(LOG.isLoggable(INFO)) LOG.info("Sent transport acks"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); @@ -727,6 +881,8 @@ abstract class DuplexConnection implements DatabaseListener { try { Collection<TransportUpdate> t = db.generateTransportUpdates(contactId, maxLatency); + if(LOG.isLoggable(INFO)) + LOG.info("Generated transport updates: " + (t != null)); if(t != null) writerTasks.add(new WriteTransportUpdates(t)); } catch(DbException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); @@ -747,6 +903,7 @@ abstract class DuplexConnection implements DatabaseListener { assert writer != null; try { for(TransportUpdate u : updates) writer.writeTransportUpdate(u); + if(LOG.isLoggable(INFO)) LOG.info("Sent transport updates"); } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true, true); diff --git a/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java b/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java index 759c085c32..4d30ba9732 100644 --- a/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java +++ b/briar-core/src/net/sf/briar/messaging/simplex/OutgoingSimplexConnection.java @@ -108,9 +108,8 @@ class OutgoingSimplexConnection { } catch(IOException e) { if(LOG.isLoggable(WARNING)) LOG.log(WARNING, e.toString(), e); dispose(true); - } finally { - connRegistry.unregisterConnection(contactId, transportId); } + connRegistry.unregisterConnection(contactId, transportId); } private boolean writeTransportAcks(ConnectionWriter conn, diff --git a/briar-core/src/net/sf/briar/transport/ConnectionRegistryImpl.java b/briar-core/src/net/sf/briar/transport/ConnectionRegistryImpl.java index fc919d6a83..276b296bf0 100644 --- a/briar-core/src/net/sf/briar/transport/ConnectionRegistryImpl.java +++ b/briar-core/src/net/sf/briar/transport/ConnectionRegistryImpl.java @@ -1,5 +1,7 @@ package net.sf.briar.transport; +import static java.util.logging.Level.INFO; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -7,6 +9,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Logger; import net.sf.briar.api.ContactId; import net.sf.briar.api.TransportId; @@ -15,6 +18,9 @@ import net.sf.briar.api.transport.ConnectionRegistry; class ConnectionRegistryImpl implements ConnectionRegistry { + private static final Logger LOG = + Logger.getLogger(ConnectionRegistryImpl.class.getName()); + // Locking: this private final Map<TransportId, Map<ContactId, Integer>> connections; // Locking: this @@ -36,6 +42,7 @@ class ConnectionRegistryImpl implements ConnectionRegistry { } public void registerConnection(ContactId c, TransportId t) { + if(LOG.isLoggable(INFO)) LOG.info("Connection registered"); boolean firstConnection = false; synchronized(this) { Map<ContactId, Integer> m = connections.get(t); @@ -54,11 +61,14 @@ class ConnectionRegistryImpl implements ConnectionRegistry { contactCounts.put(c, count + 1); } } - if(firstConnection) + if(firstConnection) { + if(LOG.isLoggable(INFO)) LOG.info("Contact connected"); for(ConnectionListener l : listeners) l.contactConnected(c); + } } public void unregisterConnection(ContactId c, TransportId t) { + if(LOG.isLoggable(INFO)) LOG.info("Connection unregistered"); boolean lastConnection = false; synchronized(this) { Map<ContactId, Integer> m = connections.get(t); @@ -79,16 +89,19 @@ class ConnectionRegistryImpl implements ConnectionRegistry { contactCounts.put(c, count - 1); } } - if(lastConnection) + if(lastConnection) { + if(LOG.isLoggable(INFO)) LOG.info("Contact disconnected"); for(ConnectionListener l : listeners) l.contactDisconnected(c); + } } public synchronized Collection<ContactId> getConnectedContacts( TransportId t) { Map<ContactId, Integer> m = connections.get(t); if(m == null) return Collections.emptyList(); - List<ContactId> keys = new ArrayList<ContactId>(m.keySet()); - return Collections.unmodifiableList(keys); + List<ContactId> ids = new ArrayList<ContactId>(m.keySet()); + if(LOG.isLoggable(INFO)) LOG.info(ids.size() + " contacts connected"); + return Collections.unmodifiableList(ids); } public synchronized boolean isConnected(ContactId c) { -- GitLab