From 1d20761123ac76956e55ab5685922c7b1b64091b Mon Sep 17 00:00:00 2001 From: akwizgran <akwizgran@users.sourceforge.net> Date: Thu, 6 Nov 2014 13:13:23 +0000 Subject: [PATCH] Messaging sessions aren't responsible for closing their streams. The TransportReader/Writer's dispose() method should handle that, and ConnectionManager is responsible for calling it. --- .../messaging/DuplexOutgoingSession.java | 12 ++++++++- .../messaging/IncomingSession.java | 3 --- .../messaging/SimplexOutgoingSession.java | 9 ++++++- .../SimplexMessagingIntegrationTest.java | 27 ++++++++----------- 4 files changed, 30 insertions(+), 21 deletions(-) diff --git a/briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java b/briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java index d2eeab7a9a..e5db2519ad 100644 --- a/briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java +++ b/briar-core/src/org/briarproject/messaging/DuplexOutgoingSession.java @@ -109,10 +109,10 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { ThrowingRunnable<IOException> task = writerTasks.take(); if(task == CLOSE) break; task.run(); + // Flush the stream if it's going to be idle if(writerTasks.isEmpty()) out.flush(); } out.flush(); - out.close(); } catch(InterruptedException e) { LOG.info("Interrupted while waiting for a packet to write"); Thread.currentThread().interrupt(); @@ -206,6 +206,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeAck(ack); LOG.info("Sent ack"); dbExecutor.execute(new GenerateAck()); @@ -240,6 +241,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; for(byte[] raw : batch) packetWriter.writeMessage(raw); LOG.info("Sent batch"); dbExecutor.execute(new GenerateBatch()); @@ -275,6 +277,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeOffer(offer); LOG.info("Sent offer"); dbExecutor.execute(new GenerateOffer()); @@ -310,6 +313,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeRequest(request); LOG.info("Sent request"); dbExecutor.execute(new GenerateRequest()); @@ -344,6 +348,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { public void run() throws IOException { + if(interrupted) return; packetWriter.writeRetentionAck(ack); LOG.info("Sent retention ack"); dbExecutor.execute(new GenerateRetentionAck()); @@ -379,6 +384,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeRetentionUpdate(update); LOG.info("Sent retention update"); dbExecutor.execute(new GenerateRetentionUpdate()); @@ -413,6 +419,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeSubscriptionAck(ack); LOG.info("Sent subscription ack"); dbExecutor.execute(new GenerateSubscriptionAck()); @@ -448,6 +455,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeSubscriptionUpdate(update); LOG.info("Sent subscription update"); dbExecutor.execute(new GenerateSubscriptionUpdate()); @@ -482,6 +490,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; for(TransportAck a : acks) packetWriter.writeTransportAck(a); LOG.info("Sent transport acks"); dbExecutor.execute(new GenerateTransportAcks()); @@ -517,6 +526,7 @@ class DuplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; for(TransportUpdate u : updates) packetWriter.writeTransportUpdate(u); LOG.info("Sent transport updates"); diff --git a/briar-core/src/org/briarproject/messaging/IncomingSession.java b/briar-core/src/org/briarproject/messaging/IncomingSession.java index bdb3a96c30..f7244c4fba 100644 --- a/briar-core/src/org/briarproject/messaging/IncomingSession.java +++ b/briar-core/src/org/briarproject/messaging/IncomingSession.java @@ -48,7 +48,6 @@ class IncomingSession implements MessagingSession, EventListener { private final MessageVerifier messageVerifier; private final ContactId contactId; private final TransportId transportId; - private final InputStream in; private final PacketReader packetReader; private volatile boolean interrupted = false; @@ -65,7 +64,6 @@ class IncomingSession implements MessagingSession, EventListener { this.messageVerifier = messageVerifier; this.contactId = contactId; this.transportId = transportId; - this.in = in; packetReader = packetReaderFactory.createPacketReader(in); } @@ -102,7 +100,6 @@ class IncomingSession implements MessagingSession, EventListener { throw new FormatException(); } } - in.close(); } finally { eventBus.removeListener(this); } diff --git a/briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java b/briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java index 32a197f5a6..51ec4d81a4 100644 --- a/briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java +++ b/briar-core/src/org/briarproject/messaging/SimplexOutgoingSession.java @@ -99,7 +99,6 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { task.run(); } out.flush(); - out.close(); } catch(InterruptedException e) { LOG.info("Interrupted while waiting for a packet to write"); Thread.currentThread().interrupt(); @@ -159,6 +158,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeAck(ack); LOG.info("Sent ack"); dbExecutor.execute(new GenerateAck()); @@ -194,6 +194,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; for(byte[] raw : batch) packetWriter.writeMessage(raw); LOG.info("Sent batch"); dbExecutor.execute(new GenerateBatch()); @@ -229,6 +230,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { public void run() throws IOException { + if(interrupted) return; packetWriter.writeRetentionAck(ack); LOG.info("Sent retention ack"); dbExecutor.execute(new GenerateRetentionAck()); @@ -265,6 +267,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeRetentionUpdate(update); LOG.info("Sent retention update"); dbExecutor.execute(new GenerateRetentionUpdate()); @@ -300,6 +303,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeSubscriptionAck(ack); LOG.info("Sent subscription ack"); dbExecutor.execute(new GenerateSubscriptionAck()); @@ -336,6 +340,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; packetWriter.writeSubscriptionUpdate(update); LOG.info("Sent subscription update"); dbExecutor.execute(new GenerateSubscriptionUpdate()); @@ -371,6 +376,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; for(TransportAck a : acks) packetWriter.writeTransportAck(a); LOG.info("Sent transport acks"); dbExecutor.execute(new GenerateTransportAcks()); @@ -407,6 +413,7 @@ class SimplexOutgoingSession implements MessagingSession, EventListener { } public void run() throws IOException { + if(interrupted) return; for(TransportUpdate u : updates) packetWriter.writeTransportUpdate(u); LOG.info("Sent transport updates"); diff --git a/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java b/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java index 957a46b6ff..4d5b25767e 100644 --- a/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java +++ b/briar-tests/src/org/briarproject/messaging/SimplexMessagingIntegrationTest.java @@ -93,13 +93,6 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { new TransportModule()); } - @Test - public void testInjection() { - DatabaseComponent aliceDb = alice.getInstance(DatabaseComponent.class); - DatabaseComponent bobDb = bob.getInstance(DatabaseComponent.class); - assertFalse(aliceDb == bobDb); - } - @Test public void testWriteAndRead() throws Exception { read(write()); @@ -110,8 +103,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { DatabaseComponent db = alice.getInstance(DatabaseComponent.class); assertFalse(db.open()); // Start Alice's key manager - KeyManager km = alice.getInstance(KeyManager.class); - km.start(); + KeyManager keyManager = alice.getInstance(KeyManager.class); + keyManager.start(); // Add a local pseudonym for Alice AuthorId aliceId = new AuthorId(TestUtils.getRandomId()); LocalAuthor aliceAuthor = new LocalAuthor(aliceId, "Alice", @@ -131,7 +124,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { db.addTransport(transportId, LATENCY); Endpoint ep = new Endpoint(contactId, transportId, epoch, true); db.addEndpoint(ep); - km.endpointAdded(ep, LATENCY, initialSecret.clone()); + keyManager.endpointAdded(ep, LATENCY, initialSecret.clone()); // Send Bob a message String contentType = "text/plain"; long timestamp = System.currentTimeMillis(); @@ -144,7 +137,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { ByteArrayOutputStream out = new ByteArrayOutputStream(); StreamWriterFactory streamWriterFactory = alice.getInstance(StreamWriterFactory.class); - StreamContext ctx = km.getStreamContext(contactId, transportId); + StreamContext ctx = keyManager.getStreamContext(contactId, transportId); assertNotNull(ctx); StreamWriter streamWriter = streamWriterFactory.createStreamWriter(out, MAX_FRAME_LENGTH, ctx); @@ -158,8 +151,9 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { streamWriter.getOutputStream()); // Write whatever needs to be written session.run(); + streamWriter.getOutputStream().close(); // Clean up - km.stop(); + keyManager.stop(); db.close(); // Return the contents of the stream return out.toByteArray(); @@ -170,8 +164,8 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { DatabaseComponent db = bob.getInstance(DatabaseComponent.class); assertFalse(db.open()); // Start Bob's key manager - KeyManager km = bob.getInstance(KeyManager.class); - km.start(); + KeyManager keyManager = bob.getInstance(KeyManager.class); + keyManager.start(); // Add a local pseudonym for Bob AuthorId bobId = new AuthorId(TestUtils.getRandomId()); LocalAuthor bobAuthor = new LocalAuthor(bobId, "Bob", @@ -191,7 +185,7 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { db.addTransport(transportId, LATENCY); Endpoint ep = new Endpoint(contactId, transportId, epoch, false); db.addEndpoint(ep); - km.endpointAdded(ep, LATENCY, initialSecret.clone()); + keyManager.endpointAdded(ep, LATENCY, initialSecret.clone()); // Set up an event listener MessageListener listener = new MessageListener(); bob.getInstance(EventBus.class).addListener(listener); @@ -222,10 +216,11 @@ public class SimplexMessagingIntegrationTest extends BriarTestCase { assertFalse(listener.messageAdded); // Read whatever needs to be read session.run(); + streamReader.getInputStream().close(); // The private message from Alice should have been added assertTrue(listener.messageAdded); // Clean up - km.stop(); + keyManager.stop(); db.close(); } -- GitLab