Skip to content
Snippets Groups Projects
Verified Commit 8711e478 authored by akwizgran's avatar akwizgran
Browse files

Write any records in the queue before ending session.

parent cc943be5
No related branches found
No related tags found
No related merge requests found
Pipeline #4390 passed
...@@ -166,6 +166,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener { ...@@ -166,6 +166,9 @@ class DuplexOutgoingSession implements SyncSession, EventListener {
dataToFlush = true; dataToFlush = true;
} }
} }
// Write any records that were already in the queue
ThrowingRunnable<IOException> task;
while ((task = writerTasks.poll()) != null) task.run();
streamWriter.sendEndOfStream(); streamWriter.sendEndOfStream();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a record to write"); LOG.info("Interrupted while waiting for a record to write");
......
...@@ -95,6 +95,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener { ...@@ -95,6 +95,9 @@ class SimplexOutgoingSession implements SyncSession, EventListener {
if (task == CLOSE) break; if (task == CLOSE) break;
task.run(); task.run();
} }
// Write any records that were already in the queue
ThrowingRunnable<IOException> task;
while ((task = writerTasks.poll()) != null) task.run();
streamWriter.sendEndOfStream(); streamWriter.sendEndOfStream();
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Interrupted while waiting for a record to write"); LOG.info("Interrupted while waiting for a record to write");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment