Skip to content
Snippets Groups Projects
Commit d91f96b5 authored by akwizgran's avatar akwizgran
Browse files

Close the writer when the reader closes; only dispose of the

connection once.
parent b95753bb
No related branches found
No related tags found
No related merge requests found
...@@ -89,6 +89,7 @@ class OutgoingBatchConnection { ...@@ -89,6 +89,7 @@ class OutgoingBatchConnection {
} }
// Flush the output stream // Flush the output stream
out.flush(); out.flush();
transport.dispose(true);
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); transport.dispose(false);
...@@ -96,7 +97,5 @@ class OutgoingBatchConnection { ...@@ -96,7 +97,5 @@ class OutgoingBatchConnection {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); transport.dispose(false);
} }
// Success
transport.dispose(true);
} }
} }
...@@ -67,7 +67,7 @@ abstract class StreamConnection implements DatabaseListener { ...@@ -67,7 +67,7 @@ abstract class StreamConnection implements DatabaseListener {
protected final StreamTransportConnection transport; protected final StreamTransportConnection transport;
private final Executor dbExecutor, verificationExecutor; private final Executor dbExecutor, verificationExecutor;
private final AtomicBoolean canSendOffer; private final AtomicBoolean canSendOffer, disposed;
private final BlockingQueue<Runnable> writerTasks; private final BlockingQueue<Runnable> writerTasks;
private Collection<MessageId> offered = null; // Locking: this private Collection<MessageId> offered = null; // Locking: this
...@@ -91,6 +91,7 @@ abstract class StreamConnection implements DatabaseListener { ...@@ -91,6 +91,7 @@ abstract class StreamConnection implements DatabaseListener {
this.contactId = contactId; this.contactId = contactId;
this.transport = transport; this.transport = transport;
canSendOffer = new AtomicBoolean(false); canSendOffer = new AtomicBoolean(false);
disposed = new AtomicBoolean(false);
writerTasks = new LinkedBlockingQueue<Runnable>(); writerTasks = new LinkedBlockingQueue<Runnable>();
} }
...@@ -164,13 +165,14 @@ abstract class StreamConnection implements DatabaseListener { ...@@ -164,13 +165,14 @@ abstract class StreamConnection implements DatabaseListener {
throw new FormatException(); throw new FormatException();
} }
} }
transport.dispose(true); writerTasks.add(CLOSE);
if(!disposed.getAndSet(true)) transport.dispose(true);
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); if(!disposed.getAndSet(true)) transport.dispose(false);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); if(!disposed.getAndSet(true)) transport.dispose(false);
} }
} }
...@@ -205,13 +207,13 @@ abstract class StreamConnection implements DatabaseListener { ...@@ -205,13 +207,13 @@ abstract class StreamConnection implements DatabaseListener {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
transport.dispose(true); if(!disposed.getAndSet(true)) transport.dispose(true);
} catch(DbException e) { } catch(DbException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); if(!disposed.getAndSet(true)) transport.dispose(false);
} catch(IOException e) { } catch(IOException e) {
if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage()); if(LOG.isLoggable(Level.WARNING)) LOG.warning(e.getMessage());
transport.dispose(false); if(!disposed.getAndSet(true)) transport.dispose(false);
} finally { } finally {
db.removeListener(this); db.removeListener(this);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment