From 2020f60ebfda47f9924cc68af52c17ce75385119 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Wed, 7 Dec 2011 20:52:04 +0000 Subject: [PATCH] Rewrote StreamConnection to decouple the database from IO. Runnables encapsulating database or IO tasks are passed to the relevant threads. The IO thread's task queue is unbounded to avoid deadlock, but its growth is indirectly limited by the progress of database tasks. --- .../batch/IncomingBatchConnection.java | 65 +- .../batch/OutgoingBatchConnection.java | 18 +- .../net/sf/briar/transport/stream/Flags.java | 14 - .../stream/IncomingStreamConnection.java | 15 +- .../stream/OutgoingStreamConnection.java | 8 +- .../transport/stream/StreamConnection.java | 666 ++++++++++-------- .../stream/StreamConnectionFactoryImpl.java | 24 +- 7 files changed, 425 insertions(+), 385 deletions(-) delete mode 100644 components/net/sf/briar/transport/stream/Flags.java diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java index 4baecbc4e0..2d4c0d810a 100644 --- a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java +++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java @@ -1,6 +1,7 @@ package net.sf.briar.transport.batch; import java.io.IOException; +import java.io.InputStream; import java.security.GeneralSecurityException; import java.util.concurrent.Executor; import java.util.logging.Level; @@ -32,62 +33,60 @@ class IncomingBatchConnection { private final DatabaseComponent db; private final ProtocolReaderFactory protoFactory; private final ConnectionContext ctx; - private final BatchTransportReader reader; + private final BatchTransportReader transport; private final byte[] tag; + private final ContactId contactId; IncomingBatchConnection(@DatabaseExecutor Executor dbExecutor, DatabaseComponent db, ConnectionReaderFactory connFactory, ProtocolReaderFactory protoFactory, ConnectionContext ctx, - BatchTransportReader reader, byte[] tag) { + BatchTransportReader transport, byte[] tag) { this.dbExecutor = dbExecutor; this.connFactory = connFactory; this.db = db; this.protoFactory = protoFactory; this.ctx = ctx; - this.reader = reader; + this.transport = transport; this.tag = tag; + contactId = ctx.getContactId(); } void read() { try { ConnectionReader conn = connFactory.createConnectionReader( - reader.getInputStream(), ctx.getSecret(), tag); - ProtocolReader proto = protoFactory.createProtocolReader( - conn.getInputStream()); - final ContactId c = ctx.getContactId(); + transport.getInputStream(), ctx.getSecret(), tag); + InputStream in = conn.getInputStream(); + ProtocolReader reader = protoFactory.createProtocolReader(in); // Read packets until EOF - while(!proto.eof()) { - if(proto.hasAck()) { - Ack a = proto.readAck(); - dbExecutor.execute(new ReceiveAck(c, a)); - } else if(proto.hasBatch()) { - UnverifiedBatch b = proto.readBatch(); - dbExecutor.execute(new ReceiveBatch(c, b)); - } else if(proto.hasSubscriptionUpdate()) { - SubscriptionUpdate s = proto.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate(c, s)); - } else if(proto.hasTransportUpdate()) { - TransportUpdate t = proto.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate(c, t)); + while(!reader.eof()) { + if(reader.hasAck()) { + Ack a = reader.readAck(); + dbExecutor.execute(new ReceiveAck(a)); + } else if(reader.hasBatch()) { + UnverifiedBatch b = reader.readBatch(); + dbExecutor.execute(new ReceiveBatch(b)); + } else if(reader.hasSubscriptionUpdate()) { + SubscriptionUpdate s = reader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); + } else if(reader.hasTransportUpdate()) { + TransportUpdate t = reader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(t)); } else { throw new FormatException(); } } + transport.dispose(true); } catch(IOException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); - reader.dispose(false); + transport.dispose(false); } - // Success - reader.dispose(true); } private class ReceiveAck implements Runnable { - private final ContactId contactId; private final Ack ack; - private ReceiveAck(ContactId contactId, Ack ack) { - this.contactId = contactId; + private ReceiveAck(Ack ack) { this.ack = ack; } @@ -102,11 +101,9 @@ class IncomingBatchConnection { private class ReceiveBatch implements Runnable { - private final ContactId contactId; private final UnverifiedBatch batch; - private ReceiveBatch(ContactId contactId, UnverifiedBatch batch) { - this.contactId = contactId; + private ReceiveBatch(UnverifiedBatch batch) { this.batch = batch; } @@ -124,12 +121,9 @@ class IncomingBatchConnection { private class ReceiveSubscriptionUpdate implements Runnable { - private final ContactId contactId; private final SubscriptionUpdate update; - private ReceiveSubscriptionUpdate(ContactId contactId, - SubscriptionUpdate update) { - this.contactId = contactId; + private ReceiveSubscriptionUpdate(SubscriptionUpdate update) { this.update = update; } @@ -144,12 +138,9 @@ class IncomingBatchConnection { private class ReceiveTransportUpdate implements Runnable { - private final ContactId contactId; private final TransportUpdate update; - private ReceiveTransportUpdate(ContactId contactId, - TransportUpdate update) { - this.contactId = contactId; + private ReceiveTransportUpdate(TransportUpdate update) { this.update = update; } diff --git a/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java b/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java index 30a178c695..562c5b3fe7 100644 --- a/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java +++ b/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java @@ -54,37 +54,37 @@ class OutgoingBatchConnection { transport.getOutputStream(), transport.getCapacity(), ctx.getSecret()); OutputStream out = conn.getOutputStream(); - ProtocolWriter proto = protoFactory.createProtocolWriter(out); + ProtocolWriter writer = protoFactory.createProtocolWriter(out); // There should be enough space for a packet long capacity = conn.getRemainingCapacity(); if(capacity < MAX_PACKET_LENGTH) throw new IOException(); // Write a transport update TransportUpdate t = db.generateTransportUpdate(contactId); - if(t != null) proto.writeTransportUpdate(t); + if(t != null) writer.writeTransportUpdate(t); // If there's space, write a subscription update capacity = conn.getRemainingCapacity(); if(capacity >= MAX_PACKET_LENGTH) { SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) proto.writeSubscriptionUpdate(s); + if(s != null) writer.writeSubscriptionUpdate(s); } // Write acks until you can't write acks no more capacity = conn.getRemainingCapacity(); - int maxBatches = proto.getMaxBatchesForAck(capacity); + int maxBatches = writer.getMaxBatchesForAck(capacity); Ack a = db.generateAck(contactId, maxBatches); while(a != null) { - proto.writeAck(a); + writer.writeAck(a); capacity = conn.getRemainingCapacity(); - maxBatches = proto.getMaxBatchesForAck(capacity); + maxBatches = writer.getMaxBatchesForAck(capacity); a = db.generateAck(contactId, maxBatches); } // Write batches until you can't write batches no more capacity = conn.getRemainingCapacity(); - capacity = proto.getMessageCapacityForBatch(capacity); + capacity = writer.getMessageCapacityForBatch(capacity); RawBatch b = db.generateBatch(contactId, (int) capacity); while(b != null) { - proto.writeBatch(b); + writer.writeBatch(b); capacity = conn.getRemainingCapacity(); - capacity = proto.getMessageCapacityForBatch(capacity); + capacity = writer.getMessageCapacityForBatch(capacity); b = db.generateBatch(contactId, (int) capacity); } // Flush the output stream diff --git a/components/net/sf/briar/transport/stream/Flags.java b/components/net/sf/briar/transport/stream/Flags.java deleted file mode 100644 index 422c41e8ff..0000000000 --- a/components/net/sf/briar/transport/stream/Flags.java +++ /dev/null @@ -1,14 +0,0 @@ -package net.sf.briar.transport.stream; - -interface Flags { - - // Flags raised by the database listener - static final int BATCH_RECEIVED = 1; - static final int CONTACT_REMOVED = 2; - static final int MESSAGES_ADDED = 4; - static final int SUBSCRIPTIONS_UPDATED = 8; - static final int TRANSPORTS_UPDATED = 16; - // Flags raised by the reading side of the connection - static final int OFFER_RECEIVED = 32; - static final int REQUEST_RECEIVED = 64; -} diff --git a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java index 559d9fe263..36c975eac8 100644 --- a/components/net/sf/briar/transport/stream/IncomingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/IncomingStreamConnection.java @@ -4,10 +4,9 @@ import java.io.IOException; import java.util.concurrent.Executor; import net.sf.briar.api.db.DatabaseComponent; -import net.sf.briar.api.db.DbException; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; @@ -20,14 +19,14 @@ class IncomingStreamConnection extends StreamConnection { private final ConnectionContext ctx; private final byte[] tag; - IncomingStreamConnection(Executor executor, DatabaseComponent db, - SerialComponent serial, ConnectionReaderFactory connReaderFactory, + IncomingStreamConnection(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ConnectionContext ctx, StreamTransportConnection connection, byte[] tag) { - super(executor, db, serial, connReaderFactory, connWriterFactory, + super(dbExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, ctx.getContactId(), connection); this.ctx = ctx; @@ -35,15 +34,13 @@ class IncomingStreamConnection extends StreamConnection { } @Override - protected ConnectionReader createConnectionReader() throws DbException, - IOException { + protected ConnectionReader createConnectionReader() throws IOException { return connReaderFactory.createConnectionReader( connection.getInputStream(), ctx.getSecret(), tag); } @Override - protected ConnectionWriter createConnectionWriter() throws DbException, - IOException { + protected ConnectionWriter createConnectionWriter() throws IOException { return connWriterFactory.createConnectionWriter( connection.getOutputStream(), Long.MAX_VALUE, ctx.getSecret(), tag); diff --git a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java index 0594f32ddc..89252da5c0 100644 --- a/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java +++ b/components/net/sf/briar/transport/stream/OutgoingStreamConnection.java @@ -5,11 +5,11 @@ import java.util.concurrent.Executor; import net.sf.briar.api.ContactId; import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.db.DbException; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; @@ -23,14 +23,14 @@ class OutgoingStreamConnection extends StreamConnection { private ConnectionContext ctx = null; // Locking: this - OutgoingStreamConnection(Executor executor, DatabaseComponent db, - SerialComponent serial, ConnectionReaderFactory connReaderFactory, + OutgoingStreamConnection(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, TransportIndex transportIndex, StreamTransportConnection connection) { - super(executor, db, serial, connReaderFactory, connWriterFactory, + super(dbExecutor, db, connReaderFactory, connWriterFactory, protoReaderFactory, protoWriterFactory, contactId, connection); this.transportIndex = transportIndex; } diff --git a/components/net/sf/briar/transport/stream/StreamConnection.java b/components/net/sf/briar/transport/stream/StreamConnection.java index d959600c59..c638afe3fb 100644 --- a/components/net/sf/briar/transport/stream/StreamConnection.java +++ b/components/net/sf/briar/transport/stream/StreamConnection.java @@ -11,6 +11,7 @@ import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,7 +39,6 @@ import net.sf.briar.api.protocol.Request; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.UnverifiedBatch; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionReader; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionWriter; @@ -47,14 +47,16 @@ import net.sf.briar.api.transport.StreamTransportConnection; abstract class StreamConnection implements DatabaseListener { - private static enum State { SEND_OFFER, IDLE, AWAIT_REQUEST, SEND_BATCHES }; - private static final Logger LOG = Logger.getLogger(StreamConnection.class.getName()); + // A canary indicating that the connection should be closed + private static final Runnable CLOSE_CONNECTION = new Runnable() { + public void run() {} + }; + protected final Executor dbExecutor; protected final DatabaseComponent db; - protected final SerialComponent serial; protected final ConnectionReaderFactory connReaderFactory; protected final ConnectionWriterFactory connWriterFactory; protected final ProtocolReaderFactory protoReaderFactory; @@ -62,124 +64,105 @@ abstract class StreamConnection implements DatabaseListener { protected final ContactId contactId; protected final StreamTransportConnection connection; - private int writerFlags = 0; // Locking: this + private final AtomicBoolean canSendOffer = new AtomicBoolean(false); + private final LinkedList<Runnable> writerTasks; // Locking: this + private Collection<MessageId> offered = null; // Locking: this - private LinkedList<MessageId> requested = null; // Locking: this - private Offer incomingOffer = null; // Locking: this + + private volatile ProtocolWriter writer = null; StreamConnection(@DatabaseExecutor Executor dbExecutor, - DatabaseComponent db, SerialComponent serial, - ConnectionReaderFactory connReaderFactory, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory, ContactId contactId, StreamTransportConnection connection) { this.dbExecutor = dbExecutor; this.db = db; - this.serial = serial; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; this.protoReaderFactory = protoReaderFactory; this.protoWriterFactory = protoWriterFactory; this.contactId = contactId; this.connection = connection; + writerTasks = new LinkedList<Runnable>(); } protected abstract ConnectionReader createConnectionReader() throws DbException, IOException; protected abstract ConnectionWriter createConnectionWriter() - throws DbException, IOException ; + throws DbException, IOException; public void eventOccurred(DatabaseEvent e) { - synchronized(this) { - if(e instanceof BatchReceivedEvent) { - writerFlags |= Flags.BATCH_RECEIVED; - notifyAll(); - } else if(e instanceof ContactRemovedEvent) { - ContactId c = ((ContactRemovedEvent) e).getContactId(); - if(contactId.equals(c)) { - writerFlags |= Flags.CONTACT_REMOVED; - notifyAll(); - } - } else if(e instanceof MessagesAddedEvent) { - writerFlags |= Flags.MESSAGES_ADDED; - notifyAll(); - } else if(e instanceof SubscriptionsUpdatedEvent) { - Collection<ContactId> affected = - ((SubscriptionsUpdatedEvent) e).getAffectedContacts(); - if(affected.contains(contactId)) { - writerFlags |= Flags.SUBSCRIPTIONS_UPDATED; + if(e instanceof BatchReceivedEvent) { + dbExecutor.execute(new GenerateAcks()); + } else if(e instanceof ContactRemovedEvent) { + ContactId c = ((ContactRemovedEvent) e).getContactId(); + if(contactId.equals(c)) { + synchronized(this) { + writerTasks.add(CLOSE_CONNECTION); notifyAll(); } - } else if(e instanceof LocalTransportsUpdatedEvent) { - writerFlags |= Flags.TRANSPORTS_UPDATED; - notifyAll(); } + } else if(e instanceof MessagesAddedEvent) { + if(canSendOffer.getAndSet(false)) + dbExecutor.execute(new GenerateOffer()); + } else if(e instanceof SubscriptionsUpdatedEvent) { + Collection<ContactId> affected = + ((SubscriptionsUpdatedEvent) e).getAffectedContacts(); + if(affected.contains(contactId)) { + dbExecutor.execute(new GenerateSubscriptionUpdate()); + } + } else if(e instanceof LocalTransportsUpdatedEvent) { + dbExecutor.execute(new GenerateTransportUpdate()); } } void read() { try { InputStream in = createConnectionReader().getInputStream(); - ProtocolReader proto = protoReaderFactory.createProtocolReader(in); - while(!proto.eof()) { - if(proto.hasAck()) { - Ack a = proto.readAck(); - dbExecutor.execute(new ReceiveAck(contactId, a)); - } else if(proto.hasBatch()) { - UnverifiedBatch b = proto.readBatch(); - dbExecutor.execute(new ReceiveBatch(contactId, b)); - } else if(proto.hasOffer()) { - Offer o = proto.readOffer(); - // Store the incoming offer and notify the writer - synchronized(this) { - writerFlags |= Flags.OFFER_RECEIVED; - incomingOffer = o; - notifyAll(); - } - } else if(proto.hasRequest()) { - Request r = proto.readRequest(); + ProtocolReader reader = protoReaderFactory.createProtocolReader(in); + while(!reader.eof()) { + if(reader.hasAck()) { + Ack a = reader.readAck(); + dbExecutor.execute(new ReceiveAck(a)); + } else if(reader.hasBatch()) { + UnverifiedBatch b = reader.readBatch(); + dbExecutor.execute(new ReceiveBatch(b)); + } else if(reader.hasOffer()) { + Offer o = reader.readOffer(); + dbExecutor.execute(new ReceiveOffer(o)); + } else if(reader.hasRequest()) { + Request r = reader.readRequest(); // Retrieve the offered message IDs - Collection<MessageId> off; - synchronized(this) { - if(offered == null) - throw new IOException("Unexpected request packet"); - off = offered; - offered = null; - } + Collection<MessageId> offered = getOfferedMessageIds(); // Work out which messages were requested BitSet b = r.getBitmap(); - LinkedList<MessageId> req = new LinkedList<MessageId>(); + List<MessageId> requested = new LinkedList<MessageId>(); List<MessageId> seen = new ArrayList<MessageId>(); int i = 0; - for(MessageId m : off) { - if(b.get(i++)) req.add(m); + for(MessageId m : offered) { + if(b.get(i++)) requested.add(m); else seen.add(m); } + requested = Collections.synchronizedList(requested); seen = Collections.unmodifiableList(seen); // Mark the unrequested messages as seen - dbExecutor.execute(new SetSeen(contactId, seen)); - // Store the requested message IDs and notify the writer - synchronized(this) { - if(requested != null) - throw new IOException("Unexpected request packet"); - requested = req; - writerFlags |= Flags.REQUEST_RECEIVED; - notifyAll(); - } - } else if(proto.hasSubscriptionUpdate()) { - SubscriptionUpdate s = proto.readSubscriptionUpdate(); - dbExecutor.execute(new ReceiveSubscriptionUpdate( - contactId, s)); - } else if(proto.hasTransportUpdate()) { - TransportUpdate t = proto.readTransportUpdate(); - dbExecutor.execute(new ReceiveTransportUpdate( - contactId, t)); + dbExecutor.execute(new SetSeen(seen)); + // Start sending the requested messages + dbExecutor.execute(new GenerateBatch(requested)); + } else if(reader.hasSubscriptionUpdate()) { + SubscriptionUpdate s = reader.readSubscriptionUpdate(); + dbExecutor.execute(new ReceiveSubscriptionUpdate(s)); + } else if(reader.hasTransportUpdate()) { + TransportUpdate t = reader.readTransportUpdate(); + dbExecutor.execute(new ReceiveTransportUpdate(t)); } else { throw new FormatException(); } } + connection.dispose(true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); @@ -187,142 +170,50 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); } - // Success - connection.dispose(true); + } + + private synchronized Collection<MessageId> getOfferedMessageIds() + throws FormatException { + if(offered == null) throw new FormatException(); // Unexpected request + Collection<MessageId> ids = offered; + offered = null; + return ids; + } + + // Locking: this + private void setOfferedMessageIds(Collection<MessageId> ids) { + assert offered == null; + offered = ids; } void write() { try { OutputStream out = createConnectionWriter().getOutputStream(); - ProtocolWriter proto = protoWriterFactory.createProtocolWriter(out); - // Send the initial packets: transports, subs, any waiting acks - sendTransportUpdate(proto); - sendSubscriptionUpdate(proto); - sendAcks(proto); - State state = State.SEND_OFFER; + writer = protoWriterFactory.createProtocolWriter(out); + // Start receiving database events + db.addListener(this); + // Send the initial packets: transports, subs, acks, offer + dbExecutor.execute(new GenerateTransportUpdate()); + dbExecutor.execute(new GenerateSubscriptionUpdate()); + dbExecutor.execute(new GenerateAcks()); + dbExecutor.execute(new GenerateOffer()); // Main loop while(true) { - int flags = 0; - switch(state) { - - case SEND_OFFER: - // Try to send an offer - if(sendOffer(proto)) state = State.AWAIT_REQUEST; - else state = State.IDLE; - break; - - case IDLE: - // Wait for one or more flags to be raised - synchronized(this) { - while(writerFlags == 0) { - try { - wait(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } + Runnable task = null; + synchronized(this) { + while(writerTasks.isEmpty()) { + try { + wait(); + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); } - flags = writerFlags; - writerFlags = 0; - } - // Handle the flags in approximate order of urgency - if((flags & Flags.CONTACT_REMOVED) != 0) { - connection.dispose(true); - return; - } - if((flags & Flags.TRANSPORTS_UPDATED) != 0) { - sendTransportUpdate(proto); - } - if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) { - sendSubscriptionUpdate(proto); - } - if((flags & Flags.BATCH_RECEIVED) != 0) { - sendAcks(proto); - } - if((flags & Flags.OFFER_RECEIVED) != 0) { - sendRequest(proto); - } - if((flags & Flags.REQUEST_RECEIVED) != 0) { - // Should only be received in state AWAIT_REQUEST - throw new IOException("Unexpected request packet"); - } - if((flags & Flags.MESSAGES_ADDED) != 0) { - state = State.SEND_OFFER; - } - break; - - case AWAIT_REQUEST: - // Wait for one or more flags to be raised - synchronized(this) { - while(writerFlags == 0) { - try { - wait(); - } catch(InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - flags = writerFlags; - writerFlags = 0; - } - // Handle the flags in approximate order of urgency - if((flags & Flags.CONTACT_REMOVED) != 0) { - connection.dispose(true); - return; - } - if((flags & Flags.TRANSPORTS_UPDATED) != 0) { - sendTransportUpdate(proto); } - if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) { - sendSubscriptionUpdate(proto); - } - if((flags & Flags.BATCH_RECEIVED) != 0) { - sendAcks(proto); - } - if((flags & Flags.OFFER_RECEIVED) != 0) { - sendRequest(proto); - } - if((flags & Flags.REQUEST_RECEIVED) != 0) { - state = State.SEND_BATCHES; - } - if((flags & Flags.MESSAGES_ADDED) != 0) { - // Ignored in this state - } - break; - - case SEND_BATCHES: - // Check whether any flags have been raised - synchronized(this) { - flags = writerFlags; - writerFlags = 0; - } - // Handle the flags in approximate order of urgency - if((flags & Flags.CONTACT_REMOVED) != 0) { - connection.dispose(true); - return; - } - if((flags & Flags.TRANSPORTS_UPDATED) != 0) { - sendTransportUpdate(proto); - } - if((flags & Flags.SUBSCRIPTIONS_UPDATED) != 0) { - sendSubscriptionUpdate(proto); - } - if((flags & Flags.BATCH_RECEIVED) != 0) { - sendAcks(proto); - } - if((flags & Flags.OFFER_RECEIVED) != 0) { - sendRequest(proto); - } - if((flags & Flags.REQUEST_RECEIVED) != 0) { - // Should only be received in state AWAIT_REQUEST - throw new IOException("Unexpected request packet"); - } - if((flags & Flags.MESSAGES_ADDED) != 0) { - // Ignored in this state - } - // Try to send a batch - if(!sendBatch(proto)) state = State.SEND_OFFER; - break; + task = writerTasks.poll(); } + if(task == CLOSE_CONNECTION) break; + task.run(); } + connection.dispose(true); } catch(DbException e) { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); @@ -330,95 +221,14 @@ abstract class StreamConnection implements DatabaseListener { if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); connection.dispose(false); } - // Success - connection.dispose(true); - } - - private void sendAcks(ProtocolWriter proto) - throws DbException, IOException { - int maxBatches = proto.getMaxBatchesForAck(Long.MAX_VALUE); - Ack a = db.generateAck(contactId, maxBatches); - while(a != null) { - proto.writeAck(a); - a = db.generateAck(contactId, maxBatches); - } - } - - private boolean sendBatch(ProtocolWriter proto) - throws DbException, IOException { - Collection<MessageId> req; - // Retrieve the requested message IDs - synchronized(this) { - assert offered == null; - assert requested != null; - req = requested; - } - // Try to generate a batch, updating the collection of message IDs - int capacity = proto.getMessageCapacityForBatch(Long.MAX_VALUE); - RawBatch b = db.generateBatch(contactId, capacity, req); - if(b == null) { - // No more batches can be generated - discard the remaining IDs - synchronized(this) { - assert offered == null; - assert requested == req; - requested = null; - } - return false; - } else { - proto.writeBatch(b); - return true; - } - } - - private boolean sendOffer(ProtocolWriter proto) - throws DbException, IOException { - // Generate an offer - int maxMessages = proto.getMaxMessagesForOffer(Long.MAX_VALUE); - Offer o = db.generateOffer(contactId, maxMessages); - if(o == null) return false; - proto.writeOffer(o); - // Store the offered message IDs - synchronized(this) { - assert offered == null; - assert requested == null; - offered = o.getMessageIds(); - } - return true; - } - - private void sendRequest(ProtocolWriter proto) - throws DbException, IOException { - Offer o; - // Retrieve the incoming offer - synchronized(this) { - assert incomingOffer != null; - o = incomingOffer; - incomingOffer = null; - } - // Process the offer and generate a request - Request r = db.receiveOffer(contactId, o); - proto.writeRequest(r); - } - - private void sendTransportUpdate(ProtocolWriter proto) - throws DbException, IOException { - TransportUpdate t = db.generateTransportUpdate(contactId); - if(t != null) proto.writeTransportUpdate(t); - } - - private void sendSubscriptionUpdate(ProtocolWriter proto) - throws DbException, IOException { - SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); - if(s != null) proto.writeSubscriptionUpdate(s); } + // This task runs on a database thread private class ReceiveAck implements Runnable { - private final ContactId contactId; private final Ack ack; - private ReceiveAck(ContactId contactId, Ack ack) { - this.contactId = contactId; + private ReceiveAck(Ack ack) { this.ack = ack; } @@ -431,13 +241,12 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread private class ReceiveBatch implements Runnable { - private final ContactId contactId; private final UnverifiedBatch batch; - private ReceiveBatch(ContactId contactId, UnverifiedBatch batch) { - this.contactId = contactId; + private ReceiveBatch(UnverifiedBatch batch) { this.batch = batch; } @@ -453,13 +262,54 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread + private class ReceiveOffer implements Runnable { + + private final Offer offer; + + private ReceiveOffer(Offer offer) { + this.offer = offer; + } + + public void run() { + try { + Request r = db.receiveOffer(contactId, offer); + synchronized(StreamConnection.this) { + writerTasks.add(new WriteRequest(r)); + StreamConnection.this.notifyAll(); + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteRequest implements Runnable { + + private final Request request; + + private WriteRequest(Request request) { + this.request = request; + } + + public void run() { + assert writer != null; + try { + writer.writeRequest(request); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread private class SetSeen implements Runnable { - private final ContactId contactId; private final Collection<MessageId> seen; - private SetSeen(ContactId contactId, Collection<MessageId> seen) { - this.contactId = contactId; + private SetSeen(Collection<MessageId> seen) { this.seen = seen; } @@ -472,14 +322,12 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread private class ReceiveSubscriptionUpdate implements Runnable { - private final ContactId contactId; private final SubscriptionUpdate update; - private ReceiveSubscriptionUpdate(ContactId contactId, - SubscriptionUpdate update) { - this.contactId = contactId; + private ReceiveSubscriptionUpdate(SubscriptionUpdate update) { this.update = update; } @@ -492,14 +340,12 @@ abstract class StreamConnection implements DatabaseListener { } } + // This task runs on a database thread private class ReceiveTransportUpdate implements Runnable { - private final ContactId contactId; private final TransportUpdate update; - private ReceiveTransportUpdate(ContactId contactId, - TransportUpdate update) { - this.contactId = contactId; + private ReceiveTransportUpdate(TransportUpdate update) { this.update = update; } @@ -511,4 +357,226 @@ abstract class StreamConnection implements DatabaseListener { } } } + + // This task runs on a database thread + private class GenerateAcks implements Runnable { + + public void run() { + assert writer != null; + int maxBatches = writer.getMaxBatchesForAck(Long.MAX_VALUE); + try { + Ack a = db.generateAck(contactId, maxBatches); + while(a != null) { + synchronized(StreamConnection.this) { + writerTasks.add(new WriteAck(a)); + StreamConnection.this.notifyAll(); + } + a = db.generateAck(contactId, maxBatches); + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteAck implements Runnable { + + private final Ack ack; + + private WriteAck(Ack ack) { + this.ack = ack; + } + + public void run() { + assert writer != null; + try { + writer.writeAck(ack); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thred + private class GenerateBatch implements Runnable { + + private final Collection<MessageId> requested; + + private GenerateBatch(Collection<MessageId> requested) { + this.requested = requested; + } + + public void run() { + assert writer != null; + int capacity = writer.getMessageCapacityForBatch(Long.MAX_VALUE); + try { + RawBatch b = db.generateBatch(contactId, capacity, requested); + if(b == null) { + // No batch to write - send another offer + new GenerateOffer().run(); + } else { + // Write the batch + synchronized(StreamConnection.this) { + writerTasks.add(new WriteBatch(b, requested)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteBatch implements Runnable { + + private final RawBatch batch; + private final Collection<MessageId> requested; + + private WriteBatch(RawBatch batch, Collection<MessageId> requested) { + this.batch = batch; + this.requested = requested; + } + + public void run() { + assert writer != null; + try { + writer.writeBatch(batch); + if(requested.isEmpty()) { + // No more batches to send - send another offer + dbExecutor.execute(new GenerateOffer()); + } else { + // Send another batch + dbExecutor.execute(new GenerateBatch(requested)); + } + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread + private class GenerateOffer implements Runnable { + + public void run() { + assert writer != null; + int maxMessages = writer.getMaxMessagesForOffer(Long.MAX_VALUE); + try { + Offer o = db.generateOffer(contactId, maxMessages); + if(o == null) { + // No messages to offer - wait for some to be added + canSendOffer.set(true); + } else { + synchronized(StreamConnection.this) { + // Store the offered message IDs + setOfferedMessageIds(o.getMessageIds()); + // Write the offer on the writer thread + writerTasks.add(new WriteOffer(o)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteOffer implements Runnable { + + private final Offer offer; + + private WriteOffer(Offer offer) { + this.offer = offer; + } + + public void run() { + assert writer != null; + try { + writer.writeOffer(offer); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread + private class GenerateSubscriptionUpdate implements Runnable { + + public void run() { + try { + SubscriptionUpdate s = db.generateSubscriptionUpdate(contactId); + if(s != null) { + synchronized(StreamConnection.this) { + writerTasks.add(new WriteSubscriptionUpdate(s)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteSubscriptionUpdate implements Runnable { + + private final SubscriptionUpdate update; + + private WriteSubscriptionUpdate(SubscriptionUpdate update) { + this.update = update; + } + + public void run() { + assert writer != null; + try { + writer.writeSubscriptionUpdate(update); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } + + // This task runs on a database thread + private class GenerateTransportUpdate implements Runnable { + + public void run() { + try { + TransportUpdate t = db.generateTransportUpdate(contactId); + if(t != null) { + synchronized(StreamConnection.this) { + writerTasks.add(new WriteTransportUpdate(t)); + StreamConnection.this.notifyAll(); + } + } + } catch(DbException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + } + } + } + + // This task runs on the writer thread + private class WriteTransportUpdate implements Runnable { + + private final TransportUpdate update; + + private WriteTransportUpdate(TransportUpdate update) { + this.update = update; + } + + public void run() { + assert writer != null; + try { + writer.writeTransportUpdate(update); + } catch(IOException e) { + if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); + connection.dispose(false); + } + } + } } diff --git a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java index 7217b862dd..67787c8207 100644 --- a/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java +++ b/components/net/sf/briar/transport/stream/StreamConnectionFactoryImpl.java @@ -4,10 +4,10 @@ import java.util.concurrent.Executor; import net.sf.briar.api.ContactId; import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DatabaseExecutor; import net.sf.briar.api.protocol.ProtocolReaderFactory; import net.sf.briar.api.protocol.ProtocolWriterFactory; import net.sf.briar.api.protocol.TransportIndex; -import net.sf.briar.api.serial.SerialComponent; import net.sf.briar.api.transport.ConnectionContext; import net.sf.briar.api.transport.ConnectionReaderFactory; import net.sf.briar.api.transport.ConnectionWriterFactory; @@ -18,23 +18,21 @@ import com.google.inject.Inject; class StreamConnectionFactoryImpl implements StreamConnectionFactory { - private final Executor executor; + private final Executor dbExecutor; private final DatabaseComponent db; - private final SerialComponent serial; private final ConnectionReaderFactory connReaderFactory; private final ConnectionWriterFactory connWriterFactory; private final ProtocolReaderFactory protoReaderFactory; private final ProtocolWriterFactory protoWriterFactory; @Inject - StreamConnectionFactoryImpl(Executor executor, DatabaseComponent db, - SerialComponent serial, ConnectionReaderFactory connReaderFactory, + StreamConnectionFactoryImpl(@DatabaseExecutor Executor dbExecutor, + DatabaseComponent db, ConnectionReaderFactory connReaderFactory, ConnectionWriterFactory connWriterFactory, ProtocolReaderFactory protoReaderFactory, ProtocolWriterFactory protoWriterFactory) { - this.executor = executor; + this.dbExecutor = dbExecutor; this.db = db; - this.serial = serial; this.connReaderFactory = connReaderFactory; this.connWriterFactory = connWriterFactory; this.protoReaderFactory = protoReaderFactory; @@ -43,9 +41,9 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { public void createIncomingConnection(ConnectionContext ctx, StreamTransportConnection s, byte[] tag) { - final StreamConnection conn = new IncomingStreamConnection(executor, db, - serial, connReaderFactory, connWriterFactory, - protoReaderFactory, protoWriterFactory, ctx, s, tag); + final StreamConnection conn = new IncomingStreamConnection(dbExecutor, + db, connReaderFactory, connWriterFactory, protoReaderFactory, + protoWriterFactory, ctx, s, tag); Runnable write = new Runnable() { public void run() { conn.write(); @@ -62,9 +60,9 @@ class StreamConnectionFactoryImpl implements StreamConnectionFactory { public void createOutgoingConnection(ContactId c, TransportIndex i, StreamTransportConnection s) { - final StreamConnection conn = new OutgoingStreamConnection(executor, db, - serial, connReaderFactory, connWriterFactory, - protoReaderFactory, protoWriterFactory, c, i, s); + final StreamConnection conn = new OutgoingStreamConnection(dbExecutor, + db, connReaderFactory, connWriterFactory, protoReaderFactory, + protoWriterFactory, c, i, s); Runnable write = new Runnable() { public void run() { conn.write(); -- GitLab