From e214c40b1189e46cd3325793fd50a894d1ad4da1 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Fri, 14 Oct 2011 22:48:16 +0100 Subject: [PATCH] Code cleanup and comments. --- .../transport/stream/StreamConnection.java | 85 +++++++++++-------- 1 file changed, 51 insertions(+), 34 deletions(-) diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index b0a5325df0..d901ccdc43 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -52,9 +52,9 @@ abstract class StreamConnection implements DatabaseListener { protected final ContactId contactId; protected final StreamTransportConnection connection; - // The following fields must only be accessed with this's lock held + // These fields must only be accessed with this's lock held private int writerFlags = 0; - private Collection<MessageId> outgoingOffer = null; + private Collection<MessageId> offered = null; private Collection<MessageId> requested = null; private Offer incomingOffer = null; @@ -107,6 +107,7 @@ abstract class StreamConnection implements DatabaseListener { db.receiveBatch(contactId, b); } else if(proto.hasOffer()) { Offer o = proto.readOffer(); + // Store the incoming offer and notify the writer synchronized(this) { writerFlags |= Flags.OFFER_RECEIVED; incomingOffer = o; @@ -114,31 +115,33 @@ abstract class StreamConnection implements DatabaseListener { } } else if(proto.hasRequest()) { Request r = proto.readRequest(); - Collection<MessageId> offered, seen, unseen; + // Retrieve the offered message IDs + Collection<MessageId> off; synchronized(this) { - if(outgoingOffer == null) + if(offered == null) throw new IOException("Unexpected request packet"); - offered = outgoingOffer; + off = offered; + offered = null; } // Work out which messages were requested BitSet b = r.getBitmap(); - seen = new ArrayList<MessageId>(); - unseen = new LinkedList<MessageId>(); + Collection<MessageId> req = new LinkedList<MessageId>(); + Collection<MessageId> seen = new ArrayList<MessageId>(); int i = 0; - for(MessageId m : offered) { - if(b.get(i++)) unseen.add(m); + for(MessageId m : off) { + if(b.get(i++)) req.add(m); else seen.add(m); } + // Mark the unrequested messages as seen + db.setSeen(contactId, seen); + // Store the requested message IDs and notify the writer synchronized(this) { - assert outgoingOffer != null; - if(requested != null && !requested.isEmpty()) + if(requested != null) throw new IOException("Unexpected request packet"); - outgoingOffer = null; - requested = unseen; + requested = req; writerFlags |= Flags.REQUEST_RECEIVED; notifyAll(); } - db.setSeen(contactId, seen); } else if(proto.hasSubscriptionUpdate()) { SubscriptionUpdate s = proto.readSubscriptionUpdate(); db.receiveSubscriptionUpdate(contactId, s); @@ -179,12 +182,12 @@ abstract class StreamConnection implements DatabaseListener { sendAcks(ackWriter); State state = State.SEND_OFFER; // Main loop - boolean close = false; - while(!close) { + while(true) { int flags = 0; switch(state) { case SEND_OFFER: + // Try to send an offer if(sendOffer(offerWriter)) state = State.AWAIT_REQUEST; else state = State.IDLE; break; @@ -203,8 +206,8 @@ abstract class StreamConnection implements DatabaseListener { // Handle the flags in approximate order of urgency if((flags & Flags.CONTACTS_UPDATED) != 0) { if(!db.getContacts().contains(contactId)) { - close = true; - break; + connection.dispose(true); + return; } } if((flags & Flags.TRANSPORTS_UPDATED) != 0) { @@ -220,6 +223,7 @@ abstract class StreamConnection implements DatabaseListener { sendRequest(requestWriter); } if((flags & Flags.REQUEST_RECEIVED) != 0) { + // Should only be received in state AWAIT_REQUEST throw new IOException("Unexpected request packet"); } if((flags & Flags.MESSAGES_ADDED) != 0) { @@ -241,8 +245,8 @@ abstract class StreamConnection implements DatabaseListener { // Handle the flags in approximate order of urgency if((flags & Flags.CONTACTS_UPDATED) != 0) { if(!db.getContacts().contains(contactId)) { - close = true; - break; + connection.dispose(true); + return; } } if((flags & Flags.TRANSPORTS_UPDATED) != 0) { @@ -274,8 +278,8 @@ abstract class StreamConnection implements DatabaseListener { // Handle the flags in approximate order of urgency if((flags & Flags.CONTACTS_UPDATED) != 0) { if(!db.getContacts().contains(contactId)) { - close = true; - break; + connection.dispose(true); + return; } } if((flags & Flags.TRANSPORTS_UPDATED) != 0) { @@ -291,12 +295,13 @@ abstract class StreamConnection implements DatabaseListener { sendRequest(requestWriter); } if((flags & Flags.REQUEST_RECEIVED) != 0) { + // Should only be received in state AWAIT_REQUEST throw new IOException("Unexpected request packet"); } if((flags & Flags.MESSAGES_ADDED) != 0) { // Ignored in this state } - // Send a batch if possible, otherwise an offer + // Try to send a batch if(!sendBatch(batchWriter)) state = State.SEND_OFFER; break; } @@ -317,35 +322,47 @@ abstract class StreamConnection implements DatabaseListener { } private boolean sendBatch(BatchWriter b) throws DbException, IOException { - Collection<MessageId> ids; + Collection<MessageId> req; + // Retrieve the requested message IDs synchronized(this) { - assert outgoingOffer == null; + assert offered == null; assert requested != null; - ids = requested; + req = requested; + } + // Try to generate a batch, updating the collection of message IDs + boolean anyAdded = db.generateBatch(contactId, b, req); + // If no more batches can be generated, discard the remaining IDs + if(!anyAdded) { + synchronized(this) { + assert offered == null; + assert requested == req; + requested = null; + } } - boolean anyAdded = db.generateBatch(contactId, b, ids); - if(!anyAdded) ids.clear(); return anyAdded; } private boolean sendOffer(OfferWriter o) throws DbException, IOException { - Collection<MessageId> ids = db.generateOffer(contactId, o); + // Generate an offer + Collection<MessageId> off = db.generateOffer(contactId, o); + // Store the offered message IDs synchronized(this) { - assert outgoingOffer == null; - assert requested == null || requested.isEmpty(); - outgoingOffer = ids; + assert offered == null; + assert requested == null; + offered = off; } - boolean anyOffered = !ids.isEmpty(); - return anyOffered; + return !off.isEmpty(); } private void sendRequest(RequestWriter r) throws DbException, IOException { Offer o; + // Retrieve the incoming offer synchronized(this) { assert incomingOffer != null; o = incomingOffer; incomingOffer = null; } + // Process the offer and generate a request db.receiveOffer(contactId, o, r); } -- GitLab