diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java index 4baecbc4e0a5066619c9c4bc77c2cf6baad69869..2d4c0d810acab8d5016d5232155ecfa0d6a289c5 100644 --- a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java +++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java @@ -1,6 +1,7 @@ package net.sf.briar.transport.batch; import java.io.IOException; +import java.io.InputStream; import java.security.GeneralSecurityException; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -32,62 +33,60 @@ class IncomingBatchConnection { private final DatabaseComponent db; private final ProtocolReaderFactory protoFactory; private final ConnectionContext ctx; - private final BatchTransportReader reader; + private final BatchTransportReader transport; private final byte[] tag; + private final ContactId contactId; IncomingBatchConnection(@DatabaseExecutor Executor dbExecutor, DatabaseComponent db, ConnectionReaderFactory connFactory, ProtocolReaderFactory protoFactory, ConnectionContext ctx, - BatchTransportReader reader, byte[] tag) { + BatchTransportReader transport, byte[] tag) { this.dbExecutor = dbExecutor; this.connFactory = connFactory; this.db = db; this.protoFactory = protoFactory; this.ctx = ctx; - this.reader = reader; + this.transport = transport; this.tag = tag; + contactId = ctx.getContactId(); } void read() { try { ConnectionReader conn = connFactory.createConnectionReader( - reader.getInputStream(), ctx.getSecret(), tag); - ProtocolReader proto = protoFactory.createProtocolReader( - conn.getInputStream()); - final ContactId c = ctx.getContactId(); + transport.getInputStream(), ctx.getSecret(), tag); + InputStream in = conn.getInputStream(); + ProtocolReader reader = protoFactory.createProtocolReader(in); // Read packets until EOF - while(!proto.eof()) { - if(proto.hasAck()) { - Ack a = proto.readAck(); - dbExecutor.execute(new ReceiveAck(c, a)); - } else if(proto.hasBatch()) { - UnverifiedBatch b = proto.readBatch(); - dbExecutor.execute(new ReceiveBatch(c, b)); - } else if(proto.hasSubscriptionUpdate()) { - SubscriptionUpdate s = proto.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate(c, s)); - } else if(proto.hasTransportUpdate()) { - TransportUpdate t = proto.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate(c, t)); + while(!reader.eof()) { + if(reader.hasAck()) { + Ack a = reader.readAck(); + dbExecutor.execute(new ReceiveAck(a)); + } else if(reader.hasBatch()) { + UnverifiedBatch b = reader.readBatch(); + dbExecutor.execute(new ReceiveBatch(b)); + } else if(reader.hasSubscriptionUpdate()) { + SubscriptionUpdate s = reader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); + } else if(reader.hasTransportUpdate()) { + TransportUpdate t = reader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(t)); } else { throw new FormatException(); } } + transport.dispose(true); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - reader.dispose(false); + transport.dispose(false); } - // Success - reader.dispose(true); } private class ReceiveAck implements Runnable { - private final ContactId contactId; private final Ack ack; - private ReceiveAck(ContactId contactId, Ack ack) { - this.contactId = contactId; + private ReceiveAck(Ack ack) { this.ack = ack; } @@ -102,11 +101,9 @@ class IncomingBatchConnection { private class ReceiveBatch implements Runnable { - private final ContactId contactId; private final UnverifiedBatch batch; - private ReceiveBatch(ContactId contactId, UnverifiedBatch batch) { - this.contactId = contactId; + private ReceiveBatch(UnverifiedBatch batch) { this.batch = batch; } @@ -124,12 +121,9 @@ class IncomingBatchConnection { private class ReceiveSubscriptionUpdate implements Runnable { - private final ContactId contactId; private final SubscriptionUpdate update; - private ReceiveSubscriptionUpdate(ContactId contactId, - SubscriptionUpdate update) { - this.contactId = contactId; + private ReceiveSubscriptionUpdate(SubscriptionUpdate update) { this.update = update; } @@ -144,12 +138,9 @@ class IncomingBatchConnection { private class ReceiveTransportUpdate implements Runnable { - private final ContactId contactId; private final TransportUpdate update; - private ReceiveTransportUpdate(ContactId contactId, - TransportUpdate update) { - this.contactId = contactId; + private ReceiveTransportUpdate(TransportUpdate update) { this.update = update; } diff --git a/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java b/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java index 30a178c695e8649624a7dced05b95c5c2a80d2c1..562c5b3fe793815dfd96117e88487d6be19e830b 100644 --- a/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java +++ b/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java @@ -54,37 +54,37 @@ class OutgoingBatchConnection { transport.getOutputStream(), transport.getCapacity(), ctx.getSecret()); OutputStream out = conn.getOutputStream(); - ProtocolWriter proto = protoFactory.createProtocolWriter(out); + ProtocolWriter writer = protoFactory.createProtocolWriter(out); // There should be enough space for a packet long capacity = conn.getRemainingCapacity(); if(capacity < MAX_PACKET_LENGTH) throw new IOException(); // Write a transport update TransportUpdate t = db.generateTransportUpdate(contactId); - if(t != null) proto.writeTransportUpdate(t); + if(t != null) writer.writeTransportUpdate(t); // If there's space, write a subscription update capacity = conn.getRemainingCapacity(); if(capacity >= MAX_PACKET_LENGTH) { SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) proto.writeSubscriptionUpdate(s); + if(s != null) writer.writeSubscriptionUpdate(s); } // Write acks until you can't write acks no more capacity = conn.getRemainingCapacity(); - int maxBatches = proto.getMaxBatchesForAck(capacity); + int maxBatches = writer.getMaxBatchesForAck(capacity); Ack a = db.generateAck(contactId, maxBatches); while(a != null) { - proto.writeAck(a); + writer.writeAck(a); capacity = conn.getRemainingCapacity(); - maxBatches = proto.getMaxBatchesForAck(capacity); + maxBatches = writer.getMaxBatchesForAck(capacity); a = db.generateAck(contactId, maxBatches); } // Write batches until you can't write batches no more capacity = conn.getRemainingCapacity(); - capacity = proto.getMessageCapacityForBatch(capacity); + capacity = writer.getMessageCapacityForBatch(capacity); RawBatch b = db.generateBatch(contactId, (int) capacity); while(b != null) { - proto.writeBatch(b); + writer.writeBatch(b); capacity = conn.getRemainingCapacity(); - capacity = proto.getMessageCapacityForBatch(capacity); + capacity = writer.getMessageCapacityForBatch(capacity); b = db.generateBatch(contactId, (int) capacity); } // Flush the output stream diff --git a/components/net/sf/briar/transport/stream/Flags.java b/components/net/sf/briar/transport/stream/Flags.java deleted file mode 100644 index 422c41e8ff6a7ce1b6262e780973dee7fc351d6e..0000000000000000000000000000000000000000 --- a/components/net/sf/briar/transport/stream/Flags.java +++ /dev/null @@ -1,14 +0,0 @@ -package net.sf.briar.transport.stream; - -interface Flags { - - // Flags raised by the database listener - static final int BATCH_RECEIVED = 1; - static final int CONTACT_REMOVED = 2; - static final int MESSAGES_ADDED = 4; - static final int SUBSCRIPTIONS_UPDATED = 8; - static final int TRANSPORTS_UPDATED = 16; - // Flags raised by the reading side of the connection - static final int OFFER_RECEIVED = 32; - static final int REQUEST_RECEIVED = 64; -} diff --git a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java index 559d9fe2631e36f66b4c82cf9cfcb947f9460793..36c975eac891d079d7e2e5ce93133135b14f3446 100644 --- a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java @@ -4,10 +4,9 @@ import java.io.IOException; import java.util.concurrent.Executor; import net.sf.briar.api.db.DatabaseComponent; -import net.sf.briar.api.db.DbException; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; @@ -20,14 +19,14 @@ class IncomingStreamConnection extends StreamConnection { private final ConnectionContext ctx; private final byte[] tag; - IncomingStreamConnection(Executor executor, DatabaseComponent db, - SerialComponent serial, ConnectionReaderFactory connReaderFactory, + IncomingStreamConnection(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ConnectionContext ctx, StreamTransportConnection connection, byte[] tag) { - super(executor, db, serial, connReaderFactory, connWriterFactory, + super(dbExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, ctx.getContactId(), connection); this.ctx = ctx; @@ -35,15 +34,13 @@ class IncomingStreamConnection extends StreamConnection { } @Override - protected ConnectionReader createConnectionReader() throws DbException, - IOException { + protected ConnectionReader createConnectionReader() throws IOException { return connReaderFactory.createConnectionReader( connection.getInputStream(), ctx.getSecret(), tag); } @Override - protected ConnectionWriter createConnectionWriter() throws DbException, - IOException { + protected ConnectionWriter createConnectionWriter() throws IOException { return connWriterFactory.createConnectionWriter( connection.getOutputStream(), Long.MAX_VALUE, ctx.getSecret(), tag); diff --git a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java index 0594f32ddc97de8f425666a7d4472527a5813fe2..89252da5c0cc00c18f204c3f726e714eacb6f130 100644 --- a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java @@ -5,11 +5,11 @@ import java.util.concurrent.Executor; import net.sf.briar.api.ContactId; import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.db.DbException; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; @@ -23,14 +23,14 @@ class OutgoingStreamConnection extends StreamConnection { private ConnectionContext ctx = null; // Locking: this - OutgoingStreamConnection(Executor executor, DatabaseComponent db, - SerialComponent serial, ConnectionReaderFactory connReaderFactory, + OutgoingStreamConnection(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, TransportIndex transportIndex, StreamTransportConnection connection) { - super(executor, db, serial, connReaderFactory, connWriterFactory, + super(dbExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, contactId, connection); this.transportIndex = transportIndex; } diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index d959600c591f61dcb739f02640df25e60d15508f..c638afe3fb70718f42d8a97907a27a28d10154da 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -11,6 +11,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,7 +39,6 @@ import net.sf.briar.api.protocol.Request; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedBatch; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionWriter; @@ -47,14 +47,16 @@ import net.sf.briar.api.transport.StreamTransportConnection; abstract class StreamConnection implements DatabaseListener { - private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES }; - private static final Logger LOG = Logger.getLogger(StreamConnection.class.getName()); + // A canary indicating that the connection should be closed + private static final Runnable CLOSE_CONNECTION = new Runnable() { + public void run() {} + }; + protected final Executor dbExecutor; protected final DatabaseComponent db; - protected final SerialComponent serial; protected final ConnectionReaderFactory connReaderFactory; protected final ConnectionWriterFactory connWriterFactory; protected final ProtocolReaderFactory protoReaderFactory; @@ -62,124 +64,105 @@ abstract class StreamConnection implements DatabaseListener { protected final ContactId contactId; protected final StreamTransportConnection connection; - private int writerFlags = 0; // Locking: this + private final AtomicBoolean canSendOffer = new AtomicBoolean(false); + private final LinkedList<Runnable> writerTasks; // Locking: this + private Collection<MessageId> offered = null; // Locking: this - private LinkedList<MessageId> requested = null; // Locking: this - private Offer incomingOffer = null; // Locking: this + + private volatile ProtocolWriter writer = null; StreamConnection(@DatabaseExecutor Executor dbExecutor, - DatabaseComponent db, SerialComponent serial, - ConnectionReaderFactory connReaderFactory, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, StreamTransportConnection connection) { this.dbExecutor = dbExecutor; this.db = db; - this.serial = serial; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; this.protoReaderFactory = protoReaderFactory; this.protoWriterFactory = protoWriterFactory; this.contactId = contactId; this.connection = connection; + writerTasks = new LinkedList<Runnable>(); } protected abstract ConnectionReader createConnectionReader() throws DbException, IOException; protected abstract ConnectionWriter createConnectionWriter() - throws DbException, IOException ; + throws DbException, IOException; public void eventOccurred(DatabaseEvent e) { - synchronized(this) { - if(e instanceof BatchReceivedEvent) { - writerFlags |= Flags.BATCH_RECEIVED; - notifyAll(); - } else if(e instanceof ContactRemovedEvent) { - ContactId c = ((ContactRemovedEvent) e).getContactId(); - if(contactId.equals(c)) { - writerFlags |= Flags.CONTACT_REMOVED; - notifyAll(); - } - } else if(e instanceof MessagesAddedEvent) { - writerFlags |= Flags.MESSAGES_ADDED; - notifyAll(); - } else if(e instanceof SubscriptionsUpdatedEvent) { - Collection<ContactId> affected = - ((SubscriptionsUpdatedEvent) e).getAffectedContacts(); - if(affected.contains(contactId)) { - writerFlags |= Flags.SUBSCRIPTIONS_UPDATED; + if(e instanceof BatchReceivedEvent) { + dbExecutor.execute(new GenerateAcks()); + } else if(e instanceof ContactRemovedEvent) { + ContactId c = ((ContactRemovedEvent) e).getContactId(); + if(contactId.equals(c)) { + synchronized(this) { + writerTasks.add(CLOSE_CONNECTION); notifyAll(); } - } else if(e instanceof LocalTransportsUpdatedEvent) { - writerFlags |= Flags.TRANSPORTS_UPDATED; - notifyAll(); } + } else if(e instanceof MessagesAddedEvent) { + if(canSendOffer.getAndSet(false)) + dbExecutor.execute(new GenerateOffer()); + } else if(e instanceof SubscriptionsUpdatedEvent) { + Collection<ContactId> affected = + ((SubscriptionsUpdatedEvent) e).getAffectedContacts(); + if(affected.contains(contactId)) { + dbExecutor.execute(new GenerateSubscriptionUpdate()); + } + } else if(e instanceof LocalTransportsUpdatedEvent) { + dbExecutor.execute(new GenerateTransportUpdate()); } } void read() { try { InputStream in = createConnectionReader().getInputStream(); - ProtocolReader proto = protoReaderFactory.createProtocolReader(in); - while(!proto.eof()) { - if(proto.hasAck()) { - Ack a = proto.readAck(); - dbExecutor.execute(new ReceiveAck(contactId, a)); - } else if(proto.hasBatch()) { - UnverifiedBatch b = proto.readBatch(); - dbExecutor.execute(new ReceiveBatch(contactId, b)); - } else if(proto.hasOffer()) { - Offer o = proto.readOffer(); - // Store the incoming offer and notify the writer - synchronized(this) { - writerFlags |= Flags.OFFER_RECEIVED; - incomingOffer = o; - notifyAll(); - } - } else if(proto.hasRequest()) { - Request r = proto.readRequest(); + ProtocolReader reader = protoReaderFactory.createProtocolReader(in); + while(!reader.eof()) { + if(reader.hasAck()) { + Ack a = reader.readAck(); + dbExecutor.execute(new ReceiveAck(a)); + } else if(reader.hasBatch()) { + UnverifiedBatch b = reader.readBatch(); + dbExecutor.execute(new ReceiveBatch(b)); + } else if(reader.hasOffer()) { + Offer o = reader.readOffer(); + dbExecutor.execute(new ReceiveOffer(o)); + } else if(reader.hasRequest()) { + Request r = reader.readRequest(); // Retrieve the offered message IDs - Collection<MessageId> off; - synchronized(this) { - if(offered == null) - throw new IOException("Unexpected request packet"); - off = offered; - offered = null; - } + Collection<MessageId> offered = getOfferedMessageIds(); // Work out which messages were requested BitSet b = r.getBitmap(); - LinkedList<MessageId> req = new LinkedList<MessageId>(); + List<MessageId> requested = new LinkedList<MessageId>(); List<MessageId> seen = new ArrayList<MessageId>(); int i = 0; - for(MessageId m : off) { - if(b.get(i++)) req.add(m); + for(MessageId m : offered) { + if(b.get(i++)) requested.add(m); else seen.add(m); } + requested = Collections.synchronizedList(requested); seen = Collections.unmodifiableList(seen); // Mark the unrequested messages as seen - dbExecutor.execute(new SetSeen(contactId, seen)); - // Store the requested message IDs and notify the writer - synchronized(this) { - if(requested != null) - throw new IOException("Unexpected request packet"); - requested = req; - writerFlags |= Flags.REQUEST_RECEIVED; - notifyAll(); - } - } else if(proto.hasSubscriptionUpdate()) { - SubscriptionUpdate s = proto.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate( - contactId, s)); - } else if(proto.hasTransportUpdate()) { - TransportUpdate t = proto.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate( - contactId, t)); + dbExecutor.execute(new SetSeen(seen)); + // Start sending the requested messages + dbExecutor.execute(new GenerateBatch(requested)); + } else if(reader.hasSubscriptionUpdate()) { + SubscriptionUpdate s = reader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); + } else if(reader.hasTransportUpdate()) { + TransportUpdate t = reader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(t)); } else { throw new FormatException(); } } + connection.dispose(true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); @@ -187,142 +170,50 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); } - // Success - connection.dispose(true); + } + + private synchronized Collection<MessageId> getOfferedMessageIds() + throws FormatException { + if(offered == null) throw new FormatException(); // Unexpected request + Collection<MessageId> ids = offered; + offered = null; + return ids; + } + + // Locking: this + private void setOfferedMessageIds(Collection<MessageId> ids) { + assert offered == null; + offered = ids; } void write() { try { OutputStream out = createConnectionWriter().getOutputStream(); - ProtocolWriter proto = protoWriterFactory.createProtocolWriter(out); - // Send the initial packets: transports, subs, any waiting acks - sendTransportUpdate(proto); - sendSubscriptionUpdate(proto); - sendAcks(proto); - State state = State.SEND_OFFER; + writer = protoWriterFactory.createProtocolWriter(out); + // Start receiving database events + db.addListener(this); + // Send the initial packets: transports, subs, acks, offer + dbExecutor.execute(new GenerateTransportUpdate()); + dbExecutor.execute(new GenerateSubscriptionUpdate()); + dbExecutor.execute(new GenerateAcks()); + dbExecutor.execute(new GenerateOffer()); // Main loop while(true) { - int flags = 0; - switch(state) { - - case SEND_OFFER: - // Try to send an offer - if(sendOffer(proto)) state = State.AWAIT_REQUEST; - else state = State.IDLE; - break; - - case IDLE: - // Wait for one or more flags to be raised - synchronized(this) { - while(writerFlags == 0) { - try { - wait(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } + Runnable task = null; + synchronized(this) { + while(writerTasks.isEmpty()) { + try { + wait(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); } - flags = writerFlags; - writerFlags = 0; - } - // Handle the flags in approximate order of urgency - if((flags & Flags.CONTACT_REMOVED) != 0) { - connection.dispose(true); - return; - } - if((flags & Flags.TRANSPORTS_UPDATED) != 0) { - sendTransportUpdate(proto); - } - if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) { - sendSubscriptionUpdate(proto); - } - if((flags & Flags.BATCH_RECEIVED) != 0) { - sendAcks(proto); - } - if((flags & Flags.OFFER_RECEIVED) != 0) { - sendRequest(proto); - } - if((flags & Flags.REQUEST_RECEIVED) != 0) { - // Should only be received in state AWAIT_REQUEST - throw new IOException("Unexpected request packet"); - } - if((flags & Flags.MESSAGES_ADDED) != 0) { - state = State.SEND_OFFER; - } - break; - - case AWAIT_REQUEST: - // Wait for one or more flags to be raised - synchronized(this) { - while(writerFlags == 0) { - try { - wait(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - flags = writerFlags; - writerFlags = 0; - } - // Handle the flags in approximate order of urgency - if((flags & Flags.CONTACT_REMOVED) != 0) { - connection.dispose(true); - return; - } - if((flags & Flags.TRANSPORTS_UPDATED) != 0) { - sendTransportUpdate(proto); } - if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) { - sendSubscriptionUpdate(proto); - } - if((flags & Flags.BATCH_RECEIVED) != 0) { - sendAcks(proto); - } - if((flags & Flags.OFFER_RECEIVED) != 0) { - sendRequest(proto); - } - if((flags & Flags.REQUEST_RECEIVED) != 0) { - state = State.SEND_BATCHES; - } - if((flags & Flags.MESSAGES_ADDED) != 0) { - // Ignored in this state - } - break; - - case SEND_BATCHES: - // Check whether any flags have been raised - synchronized(this) { - flags = writerFlags; - writerFlags = 0; - } - // Handle the flags in approximate order of urgency - if((flags & Flags.CONTACT_REMOVED) != 0) { - connection.dispose(true); - return; - } - if((flags & Flags.TRANSPORTS_UPDATED) != 0) { - sendTransportUpdate(proto); - } - if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) { - sendSubscriptionUpdate(proto); - } - if((flags & Flags.BATCH_RECEIVED) != 0) { - sendAcks(proto); - } - if((flags & Flags.OFFER_RECEIVED) != 0) { - sendRequest(proto); - } - if((flags & Flags.REQUEST_RECEIVED) != 0) { - // Should only be received in state AWAIT_REQUEST - throw new IOException("Unexpected request packet"); - } - if((flags & Flags.MESSAGES_ADDED) != 0) { - // Ignored in this state - } - // Try to send a batch - if(!sendBatch(proto)) state = State.SEND_OFFER; - break; + task = writerTasks.poll(); } + if(task == CLOSE_CONNECTION) break; + task.run(); } + connection.dispose(true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); @@ -330,95 +221,14 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); } - // Success - connection.dispose(true); - } - - private void sendAcks(ProtocolWriter proto) - throws DbException, IOException { - int maxBatches = proto.getMaxBatchesForAck(Long.MAX_VALUE); - Ack a = db.generateAck(contactId, maxBatches); - while(a != null) { - proto.writeAck(a); - a = db.generateAck(contactId, maxBatches); - } - } - - private boolean sendBatch(ProtocolWriter proto) - throws DbException, IOException { - Collection<MessageId> req; - // Retrieve the requested message IDs - synchronized(this) { - assert offered == null; - assert requested != null; - req = requested; - } - // Try to generate a batch, updating the collection of message IDs - int capacity = proto.getMessageCapacityForBatch(Long.MAX_VALUE); - RawBatch b = db.generateBatch(contactId, capacity, req); - if(b == null) { - // No more batches can be generated - discard the remaining IDs - synchronized(this) { - assert offered == null; - assert requested == req; - requested = null; - } - return false; - } else { - proto.writeBatch(b); - return true; - } - } - - private boolean sendOffer(ProtocolWriter proto) - throws DbException, IOException { - // Generate an offer - int maxMessages = proto.getMaxMessagesForOffer(Long.MAX_VALUE); - Offer o = db.generateOffer(contactId, maxMessages); - if(o == null) return false; - proto.writeOffer(o); - // Store the offered message IDs - synchronized(this) { - assert offered == null; - assert requested == null; - offered = o.getMessageIds(); - } - return true; - } - - private void sendRequest(ProtocolWriter proto) - throws DbException, IOException { - Offer o; - // Retrieve the incoming offer - synchronized(this) { - assert incomingOffer != null; - o = incomingOffer; - incomingOffer = null; - } - // Process the offer and generate a request - Request r = db.receiveOffer(contactId, o); - proto.writeRequest(r); - } - - private void sendTransportUpdate(ProtocolWriter proto) - throws DbException, IOException { - TransportUpdate t = db.generateTransportUpdate(contactId); - if(t != null) proto.writeTransportUpdate(t); - } - - private void sendSubscriptionUpdate(ProtocolWriter proto) - throws DbException, IOException { - SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) proto.writeSubscriptionUpdate(s); } + // This task runs on a database thread private class ReceiveAck implements Runnable { - private final ContactId contactId; private final Ack ack; - private ReceiveAck(ContactId contactId, Ack ack) { - this.contactId = contactId; + private ReceiveAck(Ack ack) { this.ack = ack; } @@ -431,13 +241,12 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread private class ReceiveBatch implements Runnable { - private final ContactId contactId; private final UnverifiedBatch batch; - private ReceiveBatch(ContactId contactId, UnverifiedBatch batch) { - this.contactId = contactId; + private ReceiveBatch(UnverifiedBatch batch) { this.batch = batch; } @@ -453,13 +262,54 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread + private class ReceiveOffer implements Runnable { + + private final Offer offer; + + private ReceiveOffer(Offer offer) { + this.offer = offer; + } + + public void run() { + try { + Request r = db.receiveOffer(contactId, offer); + synchronized(StreamConnection.this) { + writerTasks.add(new WriteRequest(r)); + StreamConnection.this.notifyAll(); + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteRequest implements Runnable { + + private final Request request; + + private WriteRequest(Request request) { + this.request = request; + } + + public void run() { + assert writer != null; + try { + writer.writeRequest(request); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread private class SetSeen implements Runnable { - private final ContactId contactId; private final Collection<MessageId> seen; - private SetSeen(ContactId contactId, Collection<MessageId> seen) { - this.contactId = contactId; + private SetSeen(Collection<MessageId> seen) { this.seen = seen; } @@ -472,14 +322,12 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread private class ReceiveSubscriptionUpdate implements Runnable { - private final ContactId contactId; private final SubscriptionUpdate update; - private ReceiveSubscriptionUpdate(ContactId contactId, - SubscriptionUpdate update) { - this.contactId = contactId; + private ReceiveSubscriptionUpdate(SubscriptionUpdate update) { this.update = update; } @@ -492,14 +340,12 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread private class ReceiveTransportUpdate implements Runnable { - private final ContactId contactId; private final TransportUpdate update; - private ReceiveTransportUpdate(ContactId contactId, - TransportUpdate update) { - this.contactId = contactId; + private ReceiveTransportUpdate(TransportUpdate update) { this.update = update; } @@ -511,4 +357,226 @@ abstract class StreamConnection implements DatabaseListener { } } } + + // This task runs on a database thread + private class GenerateAcks implements Runnable { + + public void run() { + assert writer != null; + int maxBatches = writer.getMaxBatchesForAck(Long.MAX_VALUE); + try { + Ack a = db.generateAck(contactId, maxBatches); + while(a != null) { + synchronized(StreamConnection.this) { + writerTasks.add(new WriteAck(a)); + StreamConnection.this.notifyAll(); + } + a = db.generateAck(contactId, maxBatches); + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteAck implements Runnable { + + private final Ack ack; + + private WriteAck(Ack ack) { + this.ack = ack; + } + + public void run() { + assert writer != null; + try { + writer.writeAck(ack); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thred + private class GenerateBatch implements Runnable { + + private final Collection<MessageId> requested; + + private GenerateBatch(Collection<MessageId> requested) { + this.requested = requested; + } + + public void run() { + assert writer != null; + int capacity = writer.getMessageCapacityForBatch(Long.MAX_VALUE); + try { + RawBatch b = db.generateBatch(contactId, capacity, requested); + if(b == null) { + // No batch to write - send another offer + new GenerateOffer().run(); + } else { + // Write the batch + synchronized(StreamConnection.this) { + writerTasks.add(new WriteBatch(b, requested)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteBatch implements Runnable { + + private final RawBatch batch; + private final Collection<MessageId> requested; + + private WriteBatch(RawBatch batch, Collection<MessageId> requested) { + this.batch = batch; + this.requested = requested; + } + + public void run() { + assert writer != null; + try { + writer.writeBatch(batch); + if(requested.isEmpty()) { + // No more batches to send - send another offer + dbExecutor.execute(new GenerateOffer()); + } else { + // Send another batch + dbExecutor.execute(new GenerateBatch(requested)); + } + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread + private class GenerateOffer implements Runnable { + + public void run() { + assert writer != null; + int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE); + try { + Offer o = db.generateOffer(contactId, maxMessages); + if(o == null) { + // No messages to offer - wait for some to be added + canSendOffer.set(true); + } else { + synchronized(StreamConnection.this) { + // Store the offered message IDs + setOfferedMessageIds(o.getMessageIds()); + // Write the offer on the writer thread + writerTasks.add(new WriteOffer(o)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteOffer implements Runnable { + + private final Offer offer; + + private WriteOffer(Offer offer) { + this.offer = offer; + } + + public void run() { + assert writer != null; + try { + writer.writeOffer(offer); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread + private class GenerateSubscriptionUpdate implements Runnable { + + public void run() { + try { + SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); + if(s != null) { + synchronized(StreamConnection.this) { + writerTasks.add(new WriteSubscriptionUpdate(s)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteSubscriptionUpdate implements Runnable { + + private final SubscriptionUpdate update; + + private WriteSubscriptionUpdate(SubscriptionUpdate update) { + this.update = update; + } + + public void run() { + assert writer != null; + try { + writer.writeSubscriptionUpdate(update); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread + private class GenerateTransportUpdate implements Runnable { + + public void run() { + try { + TransportUpdate t = db.generateTransportUpdate(contactId); + if(t != null) { + synchronized(StreamConnection.this) { + writerTasks.add(new WriteTransportUpdate(t)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteTransportUpdate implements Runnable { + + private final TransportUpdate update; + + private WriteTransportUpdate(TransportUpdate update) { + this.update = update; + } + + public void run() { + assert writer != null; + try { + writer.writeTransportUpdate(update); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } } diff --git a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java index 7217b862dd87ab3683234ca408edf3834738fea8..67787c8207a373406948e5db0db90868375c0469 100644 --- a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java +++ b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java @@ -4,10 +4,10 @@ import java.util.concurrent.Executor; import net.sf.briar.api.ContactId; import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionWriterFactory; @@ -18,23 +18,21 @@ import com.google.inject.Inject; class StreamConnectionFactoryImpl implements StreamConnectionFactory { - private final Executor executor; + private final Executor dbExecutor; private final DatabaseComponent db; - private final SerialComponent serial; private final ConnectionReaderFactory connReaderFactory; private final ConnectionWriterFactory connWriterFactory; private final ProtocolReaderFactory protoReaderFactory; private final ProtocolWriterFactory protoWriterFactory; @Inject - StreamConnectionFactoryImpl(Executor executor, DatabaseComponent db, - SerialComponent serial, ConnectionReaderFactory connReaderFactory, + StreamConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory) { - this.executor = executor; + this.dbExecutor = dbExecutor; this.db = db; - this.serial = serial; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; this.protoReaderFactory = protoReaderFactory; @@ -43,9 +41,9 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { public void createIncomingConnection(ConnectionContext ctx, StreamTransportConnection s, byte[] tag) { - final StreamConnection conn = new IncomingStreamConnection(executor, db, - serial, connReaderFactory, connWriterFactory, - protoReaderFactory, protoWriterFactory, ctx, s, tag); + final StreamConnection conn = new IncomingStreamConnection(dbExecutor, + db, connReaderFactory, connWriterFactory, protoReaderFactory, + protoWriterFactory, ctx, s, tag); Runnable write = new Runnable() { public void run() { conn.write(); @@ -62,9 +60,9 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { public void createOutgoingConnection(ContactId c, TransportIndex i, StreamTransportConnection s) { - final StreamConnection conn = new OutgoingStreamConnection(executor, db, - serial, connReaderFactory, connWriterFactory, - protoReaderFactory, protoWriterFactory, c, i, s); + final StreamConnection conn = new OutgoingStreamConnection(dbExecutor, + db, connReaderFactory, connWriterFactory, protoReaderFactory, + protoWriterFactory, c, i, s); Runnable write = new Runnable() { public void run() { conn.write();