From 09971c84609dd3838bb785fc66b3bdc0eec21982 Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Thu, 22 Sep 2011 16:26:06 +0100 Subject: [PATCH] Implemented incoming and outgoing batch connections (untested). --- .../sf/briar/api/protocol/ProtocolReader.java | 2 + .../transport/batch/BatchTransportReader.java | 2 +- .../transport/batch/BatchTransportWriter.java | 2 +- .../sf/briar/protocol/ProtocolReaderImpl.java | 4 + .../batch/IncomingBatchConnection.java | 70 +++++++++++++++++ .../batch/OutgoingBatchConnection.java | 77 +++++++++++++++++++ .../batch/ByteArrayBatchTransportReader.java | 39 ++++++++++ .../batch/ByteArrayBatchTransportWriter.java | 50 ++++++++++++ 8 files changed, 244 insertions(+), 2 deletions(-) create mode 100644 components/net/sf/briar/transport/batch/IncomingBatchConnection.java create mode 100644 components/net/sf/briar/transport/batch/OutgoingBatchConnection.java create mode 100644 test/net/sf/briar/transport/batch/ByteArrayBatchTransportReader.java create mode 100644 test/net/sf/briar/transport/batch/ByteArrayBatchTransportWriter.java diff --git a/api/net/sf/briar/api/protocol/ProtocolReader.java b/api/net/sf/briar/api/protocol/ProtocolReader.java index 1ed7866cd5..781eccfba0 100644 --- a/api/net/sf/briar/api/protocol/ProtocolReader.java +++ b/api/net/sf/briar/api/protocol/ProtocolReader.java @@ -4,6 +4,8 @@ import java.io.IOException; public interface ProtocolReader { + boolean eof() throws IOException; + boolean hasAck() throws IOException; Ack readAck() throws IOException; diff --git a/api/net/sf/briar/api/transport/batch/BatchTransportReader.java b/api/net/sf/briar/api/transport/batch/BatchTransportReader.java index 7d678b3984..758a34c02c 100644 --- a/api/net/sf/briar/api/transport/batch/BatchTransportReader.java +++ b/api/net/sf/briar/api/transport/batch/BatchTransportReader.java @@ -17,5 +17,5 @@ public interface BatchTransportReader { * be called even if the reader is not used, or if an exception is thrown * while using the reader. */ - void close() throws IOException; + void dispose() throws IOException; } diff --git a/api/net/sf/briar/api/transport/batch/BatchTransportWriter.java b/api/net/sf/briar/api/transport/batch/BatchTransportWriter.java index 2c77c3c90f..7745803dff 100644 --- a/api/net/sf/briar/api/transport/batch/BatchTransportWriter.java +++ b/api/net/sf/briar/api/transport/batch/BatchTransportWriter.java @@ -20,5 +20,5 @@ public interface BatchTransportWriter { * be called even if the writer is not used, or if an exception is thrown * while using the writer. */ - void close() throws IOException; + void dispose() throws IOException; } diff --git a/components/net/sf/briar/protocol/ProtocolReaderImpl.java b/components/net/sf/briar/protocol/ProtocolReaderImpl.java index e2c39da0c4..df08deef7a 100644 --- a/components/net/sf/briar/protocol/ProtocolReaderImpl.java +++ b/components/net/sf/briar/protocol/ProtocolReaderImpl.java @@ -34,6 +34,10 @@ class ProtocolReaderImpl implements ProtocolReader { reader.addObjectReader(Types.TRANSPORT_UPDATE, transportReader); } + public boolean eof() throws IOException { + return reader.eof(); + } + public boolean hasAck() throws IOException { return reader.hasUserDefined(Types.ACK); } diff --git a/components/net/sf/briar/transport/batch/IncomingBatchConnection.java b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java new file mode 100644 index 0000000000..cf16f44b54 --- /dev/null +++ b/components/net/sf/briar/transport/batch/IncomingBatchConnection.java @@ -0,0 +1,70 @@ +package net.sf.briar.transport.batch; + +import java.io.IOException; +import java.io.InputStream; + +import net.sf.briar.api.ContactId; +import net.sf.briar.api.FormatException; +import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DbException; +import net.sf.briar.api.protocol.Ack; +import net.sf.briar.api.protocol.Batch; +import net.sf.briar.api.protocol.ProtocolReader; +import net.sf.briar.api.protocol.ProtocolReaderFactory; +import net.sf.briar.api.protocol.SubscriptionUpdate; +import net.sf.briar.api.protocol.TransportUpdate; +import net.sf.briar.api.transport.ConnectionReader; +import net.sf.briar.api.transport.ConnectionReaderFactory; +import net.sf.briar.api.transport.batch.BatchTransportReader; + +class IncomingBatchConnection { + + private final BatchTransportReader trans; + private final ConnectionReaderFactory connFactory; + private final DatabaseComponent db; + private final ProtocolReaderFactory protoFactory; + private final int transportId; + private final long connection; + private final ContactId contactId; + + IncomingBatchConnection(BatchTransportReader trans, + ConnectionReaderFactory connFactory, DatabaseComponent db, + ProtocolReaderFactory protoFactory, int transportId, + long connection, ContactId contactId) { + this.trans = trans; + this.connFactory = connFactory; + this.db = db; + this.protoFactory = protoFactory; + this.transportId = transportId; + this.connection = connection; + this.contactId = contactId; + } + + void read() throws DbException, IOException { + byte[] secret = db.getSharedSecret(contactId); + ConnectionReader conn = connFactory.createConnectionReader( + trans.getInputStream(), false, transportId, connection, secret); + InputStream in = conn.getInputStream(); + ProtocolReader proto = protoFactory.createProtocolReader(in); + // Read packets until EOF + while(!proto.eof()) { + if(proto.hasAck()) { + Ack a = proto.readAck(); + db.receiveAck(contactId, a); + } else if(proto.hasBatch()) { + Batch b = proto.readBatch(); + db.receiveBatch(contactId, b); + } else if(proto.hasSubscriptionUpdate()) { + SubscriptionUpdate s = proto.readSubscriptionUpdate(); + db.receiveSubscriptionUpdate(contactId, s); + } else if(proto.hasTransportUpdate()) { + TransportUpdate t = proto.readTransportUpdate(); + db.receiveTransportUpdate(contactId, t); + } else { + throw new FormatException(); + } + } + // Close the input stream + in.close(); + } +} diff --git a/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java b/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java new file mode 100644 index 0000000000..6d4d17d072 --- /dev/null +++ b/components/net/sf/briar/transport/batch/OutgoingBatchConnection.java @@ -0,0 +1,77 @@ +package net.sf.briar.transport.batch; + +import static net.sf.briar.api.protocol.ProtocolConstants.MAX_PACKET_LENGTH; + +import java.io.IOException; +import java.io.OutputStream; + +import net.sf.briar.api.ContactId; +import net.sf.briar.api.db.DatabaseComponent; +import net.sf.briar.api.db.DbException; +import net.sf.briar.api.protocol.writers.AckWriter; +import net.sf.briar.api.protocol.writers.BatchWriter; +import net.sf.briar.api.protocol.writers.ProtocolWriterFactory; +import net.sf.briar.api.protocol.writers.SubscriptionWriter; +import net.sf.briar.api.protocol.writers.TransportWriter; +import net.sf.briar.api.transport.ConnectionWriter; +import net.sf.briar.api.transport.ConnectionWriterFactory; +import net.sf.briar.api.transport.batch.BatchTransportWriter; + +class OutgoingBatchConnection { + + private final BatchTransportWriter trans; + private final ConnectionWriterFactory connFactory; + private final DatabaseComponent db; + private final ProtocolWriterFactory protoFactory; + private final int transportId; + private final long connection; + private final ContactId contactId; + + OutgoingBatchConnection(BatchTransportWriter trans, + ConnectionWriterFactory connFactory, DatabaseComponent db, + ProtocolWriterFactory protoFactory, int transportId, + long connection, ContactId contactId) { + this.trans = trans; + this.connFactory = connFactory; + this.db = db; + this.protoFactory = protoFactory; + this.transportId = transportId; + this.connection = connection; + this.contactId = contactId; + } + + void write() throws DbException, IOException { + byte[] secret = db.getSharedSecret(contactId); + ConnectionWriter conn = connFactory.createConnectionWriter( + trans.getOutputStream(), true, transportId, connection, secret); + OutputStream out = conn.getOutputStream(); + // There should be enough space for a packet + long capacity = conn.getCapacity(trans.getCapacity()); + if(capacity < MAX_PACKET_LENGTH) throw new IOException(); + // Write a transport update + TransportWriter t = protoFactory.createTransportWriter(out); + db.generateTransportUpdate(contactId, t); + // If there's space, write a subscription update + capacity = conn.getCapacity(trans.getCapacity()); + if(capacity >= MAX_PACKET_LENGTH) { + SubscriptionWriter s = protoFactory.createSubscriptionWriter(out); + db.generateSubscriptionUpdate(contactId, s); + } + // Write acks until you can't write acks no more + AckWriter a = protoFactory.createAckWriter(out); + do { + capacity = conn.getCapacity(trans.getCapacity()); + int max = (int) Math.min(MAX_PACKET_LENGTH, capacity); + a.setMaxPacketLength(max); + } while(db.generateAck(contactId, a)); + // Write batches until you can't write batches no more + BatchWriter b = protoFactory.createBatchWriter(out); + do { + capacity = conn.getCapacity(trans.getCapacity()); + int max = (int) Math.min(MAX_PACKET_LENGTH, capacity); + b.setMaxPacketLength(max); + } while(db.generateBatch(contactId, b)); + // Close the output stream + out.close(); + } +} diff --git a/test/net/sf/briar/transport/batch/ByteArrayBatchTransportReader.java b/test/net/sf/briar/transport/batch/ByteArrayBatchTransportReader.java new file mode 100644 index 0000000000..2088330d02 --- /dev/null +++ b/test/net/sf/briar/transport/batch/ByteArrayBatchTransportReader.java @@ -0,0 +1,39 @@ +package net.sf.briar.transport.batch; + +import java.io.ByteArrayInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +import net.sf.briar.api.transport.batch.BatchTransportReader; + +class ByteArrayBatchTransportReader extends FilterInputStream +implements BatchTransportReader { + + ByteArrayBatchTransportReader(ByteArrayInputStream in) { + super(in); + } + + public InputStream getInputStream() throws IOException { + return this; + } + + public void dispose() throws IOException { + // Nothing to do + } + + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } +} diff --git a/test/net/sf/briar/transport/batch/ByteArrayBatchTransportWriter.java b/test/net/sf/briar/transport/batch/ByteArrayBatchTransportWriter.java new file mode 100644 index 0000000000..538fcb3ae3 --- /dev/null +++ b/test/net/sf/briar/transport/batch/ByteArrayBatchTransportWriter.java @@ -0,0 +1,50 @@ +package net.sf.briar.transport.batch; + +import java.io.ByteArrayOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import net.sf.briar.api.transport.batch.BatchTransportWriter; + +class ByteArrayBatchTransportWriter extends FilterOutputStream +implements BatchTransportWriter { + + private int capacity; + + ByteArrayBatchTransportWriter(ByteArrayOutputStream out, int capacity) { + super(out); + this.capacity = capacity; + } + + public long getCapacity() throws IOException { + return capacity; + } + + public OutputStream getOutputStream() throws IOException { + return this; + } + + public void dispose() throws IOException { + // Nothing to do + } + + @Override + public void write(int b) throws IOException { + if(capacity < 1) throw new IllegalArgumentException(); + out.write(b); + capacity--; + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if(len > capacity) throw new IllegalArgumentException(); + out.write(b, off, len); + capacity -= len; + } +} -- GitLab