From b675c3895365b5fe85cdd5538f9a5db7885ac839 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Fri, 23 Sep 2011 12:55:23 +0100 Subject: [PATCH] Don't do IO while holding database locks. --- .../api/protocol/writers/BatchWriter.java | 7 +- components/net/sf/briar/db/Database.java | 2 +- .../sf/briar/db/DatabaseComponentImpl.java | 97 ++++++++++--------- .../protocol/writers/BatchWriterImpl.java | 15 ++- .../sf/briar/db/DatabaseComponentTest.java | 31 +++--- 5 files changed, 85 insertions(+), 67 deletions(-) diff --git a/api/net/sf/briar/api/protocol/writers/BatchWriter.java b/api/net/sf/briar/api/protocol/writers/BatchWriter.java index e14aa127e7..7d0036b67d 100644 --- a/api/net/sf/briar/api/protocol/writers/BatchWriter.java +++ b/api/net/sf/briar/api/protocol/writers/BatchWriter.java @@ -7,9 +7,12 @@ import net.sf.briar.api.protocol.BatchId; /** An interface for creating a batch packet. */ public interface BatchWriter { + /** Returns the capacity of the batch. */ + int getCapacity(); + /** - * Sets the maximum length of the serialised batch. If this method is not - * called, the default is ProtocolConstants.MAX_PACKET_LENGTH; + * Sets the maximum length of the serialised batch; the default is + * ProtocolConstants.MAX_PACKET_LENGTH; */ void setMaxPacketLength(int length); diff --git a/components/net/sf/briar/db/Database.java b/components/net/sf/briar/db/Database.java index f6a459aba1..7065e02332 100644 --- a/components/net/sf/briar/db/Database.java +++ b/components/net/sf/briar/db/Database.java @@ -287,7 +287,7 @@ interface Database<T> { * Locking: contacts read, messages read, messageStatuses read, * subscriptions read. */ - Collection<MessageId> getSendableMessages(T txn, ContactId c, int size) + Collection<MessageId> getSendableMessages(T txn, ContactId c, int capacity) throws DbException; /** diff --git a/components/net/sf/briar/db/DatabaseComponentImpl.java b/components/net/sf/briar/db/DatabaseComponentImpl.java index 82bd27f78b..1a004e350b 100644 --- a/components/net/sf/briar/db/DatabaseComponentImpl.java +++ b/components/net/sf/briar/db/DatabaseComponentImpl.java @@ -12,16 +12,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.logging.Level; import java.util.logging.Logger; -import com.google.inject.Inject; - +import net.sf.briar.api.Bytes; import net.sf.briar.api.ContactId; import net.sf.briar.api.Rating; import net.sf.briar.api.db.DatabaseComponent; import net.sf.briar.api.db.DatabaseListener; +import net.sf.briar.api.db.DatabaseListener.Event; import net.sf.briar.api.db.DbException; import net.sf.briar.api.db.NoSuchContactException; import net.sf.briar.api.db.Status; -import net.sf.briar.api.db.DatabaseListener.Event; import net.sf.briar.api.protocol.Ack; import net.sf.briar.api.protocol.AuthorId; import net.sf.briar.api.protocol.Batch; @@ -31,7 +30,6 @@ import net.sf.briar.api.protocol.GroupId; import net.sf.briar.api.protocol.Message; import net.sf.briar.api.protocol.MessageId; import net.sf.briar.api.protocol.Offer; -import net.sf.briar.api.protocol.ProtocolConstants; import net.sf.briar.api.protocol.SubscriptionUpdate; import net.sf.briar.api.protocol.TransportUpdate; import net.sf.briar.api.protocol.writers.AckWriter; @@ -42,6 +40,8 @@ import net.sf.briar.api.protocol.writers.SubscriptionWriter; import net.sf.briar.api.protocol.writers.TransportWriter; import net.sf.briar.api.transport.ConnectionWindow; +import com.google.inject.Inject; + /** * An implementation of DatabaseComponent using reentrant read-write locks. * Depending on the JVM's lock implementation, this implementation may allow @@ -426,33 +426,30 @@ DatabaseCleaner.Callback { public boolean generateBatch(ContactId c, BatchWriter b) throws DbException, IOException { + Collection<MessageId> ids = new ArrayList<MessageId>(); + Collection<Bytes> messages = new ArrayList<Bytes>(); + // Get some sendable messages from the database contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { - Collection<MessageId> sent = new ArrayList<MessageId>(); messageStatusLock.readLock().lock(); try { subscriptionLock.readLock().lock(); try { T txn = db.startTransaction(); try { - int capacity = ProtocolConstants.MAX_PACKET_LENGTH; - Collection<MessageId> sendable = - db.getSendableMessages(txn, c, capacity); - for(MessageId m : sendable) { + int capacity = b.getCapacity(); + ids = db.getSendableMessages(txn, c, capacity); + for(MessageId m : ids) { byte[] raw = db.getMessage(txn, m); - if(!b.writeMessage(raw)) break; - sent.add(m); + messages.add(new Bytes(raw)); } db.commitTransaction(txn); } catch(DbException e) { db.abortTransaction(txn); throw e; - } catch(IOException e) { - db.abortTransaction(txn); - throw e; } } finally { subscriptionLock.readLock().unlock(); @@ -460,16 +457,41 @@ DatabaseCleaner.Callback { } finally { messageStatusLock.readLock().unlock(); } - // Record the contents of the batch, unless it's empty - if(sent.isEmpty()) return false; - BatchId id = b.finish(); + } finally { + messageLock.readLock().unlock(); + } + } finally { + contactLock.readLock().unlock(); + } + if(ids.isEmpty()) return false; + writeAndRecordBatch(c, b, ids, messages); + return true; + } + + private void writeAndRecordBatch(ContactId c, BatchWriter b, + Collection<MessageId> ids, Collection<Bytes> messages) + throws DbException, IOException { + assert !ids.isEmpty(); + assert !messages.isEmpty(); + assert ids.size() == messages.size(); + // Add the messages to the batch + for(Bytes raw : messages) { + boolean written = b.writeMessage(raw.getBytes()); + assert written; + } + BatchId id = b.finish(); + // Record the contents of the batch + contactLock.readLock().lock(); + try { + if(!containsContact(c)) throw new NoSuchContactException(); + messageLock.readLock().lock(); + try { messageStatusLock.writeLock().lock(); try { T txn = db.startTransaction(); try { - db.addOutstandingBatch(txn, c, id, sent); + db.addOutstandingBatch(txn, c, id, ids); db.commitTransaction(txn); - return true; } catch(DbException e) { db.abortTransaction(txn); throw e; @@ -487,29 +509,29 @@ DatabaseCleaner.Callback { public boolean generateBatch(ContactId c, BatchWriter b, Collection<MessageId> requested) throws DbException, IOException { + Collection<MessageId> ids = new ArrayList<MessageId>(); + Collection<Bytes> messages = new ArrayList<Bytes>(); + // Get some sendable messages from the database contactLock.readLock().lock(); try { if(!containsContact(c)) throw new NoSuchContactException(); messageLock.readLock().lock(); try { - Collection<MessageId> sent = new ArrayList<MessageId>(); messageStatusLock.readLock().lock(); try{ subscriptionLock.readLock().lock(); try { T txn = db.startTransaction(); try { + int capacity = b.getCapacity(); Iterator<MessageId> it = requested.iterator(); while(it.hasNext()) { MessageId m = it.next(); - // If the message is still sendable, try to add - // it to the batch byte[] raw = db.getMessageIfSendable(txn, c, m); if(raw != null) { - // If the batch is full, don't treat the - // message as considered - if(!b.writeMessage(raw)) break; - sent.add(m); + if(raw.length > capacity) break; + ids.add(m); + messages.add(new Bytes(raw)); } it.remove(); } @@ -517,9 +539,6 @@ DatabaseCleaner.Callback { } catch(DbException e) { db.abortTransaction(txn); throw e; - } catch(IOException e) { - db.abortTransaction(txn); - throw e; } } finally { subscriptionLock.readLock().unlock(); @@ -527,29 +546,15 @@ DatabaseCleaner.Callback { } finally { messageStatusLock.readLock().unlock(); } - // Record the contents of the batch, unless it's empty - if(sent.isEmpty()) return false; - BatchId id = b.finish(); - messageStatusLock.writeLock().lock(); - try { - T txn = db.startTransaction(); - try { - db.addOutstandingBatch(txn, c, id, sent); - db.commitTransaction(txn); - return true; - } catch(DbException e) { - db.abortTransaction(txn); - throw e; - } - } finally { - messageStatusLock.writeLock().unlock(); - } } finally { messageLock.readLock().unlock(); } } finally { contactLock.readLock().unlock(); } + if(ids.isEmpty()) return false; + writeAndRecordBatch(c, b, ids, messages); + return true; } public Collection<MessageId> generateOffer(ContactId c, OfferWriter o) diff --git a/components/net/sf/briar/protocol/writers/BatchWriterImpl.java b/components/net/sf/briar/protocol/writers/BatchWriterImpl.java index e6c9c8a5df..cbae50f1d5 100644 --- a/components/net/sf/briar/protocol/writers/BatchWriterImpl.java +++ b/components/net/sf/briar/protocol/writers/BatchWriterImpl.java @@ -22,6 +22,7 @@ class BatchWriterImpl implements BatchWriter { private boolean started = false; private int capacity = ProtocolConstants.MAX_PACKET_LENGTH; + private int remaining = capacity; BatchWriterImpl(OutputStream out, SerialComponent serial, WriterFactory writerFactory, MessageDigest messageDigest) { @@ -33,20 +34,24 @@ class BatchWriterImpl implements BatchWriter { this.messageDigest = messageDigest; } + public int getCapacity() { + return capacity - headerLength - footerLength; + } + public void setMaxPacketLength(int length) { if(started) throw new IllegalStateException(); if(length < 0 || length > ProtocolConstants.MAX_PACKET_LENGTH) throw new IllegalArgumentException(); - capacity = length; + remaining = capacity = length; } public boolean writeMessage(byte[] message) throws IOException { int overhead = started ? footerLength : headerLength + footerLength; - if(capacity < message.length + overhead) return false; + if(remaining < message.length + overhead) return false; if(!started) start(); // Bypass the writer and write the raw message directly out.write(message); - capacity -= message.length; + remaining -= message.length; return true; } @@ -54,7 +59,7 @@ class BatchWriterImpl implements BatchWriter { if(!started) start(); w.writeListEnd(); out.flush(); - capacity = ProtocolConstants.MAX_PACKET_LENGTH; + remaining = capacity = ProtocolConstants.MAX_PACKET_LENGTH; started = false; return new BatchId(messageDigest.digest()); } @@ -63,7 +68,7 @@ class BatchWriterImpl implements BatchWriter { messageDigest.reset(); w.writeUserDefinedId(Types.BATCH); w.writeListStart(); - capacity -= headerLength; + remaining -= headerLength; started = true; } } diff --git a/test/net/sf/briar/db/DatabaseComponentTest.java b/test/net/sf/briar/db/DatabaseComponentTest.java index 91784f74a6..0a0bdd5a06 100644 --- a/test/net/sf/briar/db/DatabaseComponentTest.java +++ b/test/net/sf/briar/db/DatabaseComponentTest.java @@ -738,24 +738,27 @@ public abstract class DatabaseComponentTest extends TestCase { allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); + // Find out how much space we've got + oneOf(batchWriter).getCapacity(); + will(returnValue(ProtocolConstants.MAX_PACKET_LENGTH)); // Get the sendable messages oneOf(database).getSendableMessages(txn, contactId, ProtocolConstants.MAX_PACKET_LENGTH); will(returnValue(sendable)); - // Try to add both messages to the writer - only manage to add one oneOf(database).getMessage(txn, messageId); will(returnValue(raw)); - oneOf(batchWriter).writeMessage(raw); - will(returnValue(true)); oneOf(database).getMessage(txn, messageId1); will(returnValue(raw1)); + // Add the sendable messages to the batch + oneOf(batchWriter).writeMessage(raw); + will(returnValue(true)); oneOf(batchWriter).writeMessage(raw1); - will(returnValue(false)); + will(returnValue(true)); oneOf(batchWriter).finish(); will(returnValue(batchId)); // Record the message that was sent oneOf(database).addOutstandingBatch(txn, contactId, batchId, - Collections.singletonList(messageId)); + sendable); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner); @@ -784,22 +787,24 @@ public abstract class DatabaseComponentTest extends TestCase { allowing(database).commitTransaction(txn); allowing(database).containsContact(txn, contactId); will(returnValue(true)); - // Try to get the requested messages and add them to the writer + // Find out how much space we've got + oneOf(batchWriter).getCapacity(); + will(returnValue(ProtocolConstants.MAX_PACKET_LENGTH)); + // Try to get the requested messages oneOf(database).getMessageIfSendable(txn, contactId, messageId); - will(returnValue(raw)); // Message is sendable - oneOf(batchWriter).writeMessage(raw); - will(returnValue(true)); // Message added to batch - oneOf(database).getMessageIfSendable(txn, contactId, messageId1); will(returnValue(null)); // Message is not sendable - oneOf(database).getMessageIfSendable(txn, contactId, messageId2); + oneOf(database).getMessageIfSendable(txn, contactId, messageId1); will(returnValue(raw1)); // Message is sendable + oneOf(database).getMessageIfSendable(txn, contactId, messageId2); + will(returnValue(null)); // Message is not sendable + // Add the sendable message to the batch oneOf(batchWriter).writeMessage(raw1); - will(returnValue(false)); // Message not added to batch + will(returnValue(true)); oneOf(batchWriter).finish(); will(returnValue(batchId)); // Record the message that was sent oneOf(database).addOutstandingBatch(txn, contactId, batchId, - Collections.singletonList(messageId)); + Collections.singletonList(messageId1)); }}); DatabaseComponent db = createDatabaseComponent(database, cleaner); -- GitLab