Skip to content
Snippets Groups Projects
Commit 79688652 authored by bontric's avatar bontric
Browse files

add some logging

parent d2b1a107
No related branches found
No related tags found
No related merge requests found
...@@ -4,8 +4,11 @@ import java.io.ByteArrayOutputStream; ...@@ -4,8 +4,11 @@ import java.io.ByteArrayOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.logging.Logger;
public class FeedableSyncInputStream extends InputStream { public class FeedableSyncInputStream extends InputStream {
private static final Logger LOG =
Logger.getLogger(FeedableSyncInputStream.class.getName());
private ByteArrayOutputStream os = new ByteArrayOutputStream(); private ByteArrayOutputStream os = new ByteArrayOutputStream();
private boolean isEOF = false; private boolean isEOF = false;
private byte[] activeBuffer = new byte[] {}; private byte[] activeBuffer = new byte[] {};
...@@ -28,6 +31,7 @@ public class FeedableSyncInputStream extends InputStream { ...@@ -28,6 +31,7 @@ public class FeedableSyncInputStream extends InputStream {
} }
private boolean hasBytes() { private boolean hasBytes() {
// Check if active buffer has bytes left // Check if active buffer has bytes left
if (activeBufferPointer < activeBuffer.length) if (activeBufferPointer < activeBuffer.length)
return true; return true;
...@@ -38,6 +42,7 @@ public class FeedableSyncInputStream extends InputStream { ...@@ -38,6 +42,7 @@ public class FeedableSyncInputStream extends InputStream {
// update active buffer if bytes from OS are available // update active buffer if bytes from OS are available
activeBuffer = os.toByteArray(); activeBuffer = os.toByteArray();
LOG.info("consuming Buffer: " + activeBuffer.length);
os.reset(); os.reset();
activeBufferPointer = 0; activeBufferPointer = 0;
return true; return true;
...@@ -47,6 +52,8 @@ public class FeedableSyncInputStream extends InputStream { ...@@ -47,6 +52,8 @@ public class FeedableSyncInputStream extends InputStream {
if (isEOF) if (isEOF)
throw new EOFException(); throw new EOFException();
LOG.info("Feeding Buffer: " + buffer.length);
os.write(buffer); os.write(buffer);
this.notifyAll(); this.notifyAll();
} }
......
...@@ -8,6 +8,7 @@ import java.io.ByteArrayOutputStream; ...@@ -8,6 +8,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
...@@ -15,6 +16,9 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { ...@@ -15,6 +16,9 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
private final ByteArrayOutputStream bufferOS = new ByteArrayOutputStream(); private final ByteArrayOutputStream bufferOS = new ByteArrayOutputStream();
private boolean endOfStream = false; private boolean endOfStream = false;
private static final Logger LOG =
Logger.getLogger(MailboxSyncRequestWriter.class.getName());
public MailboxSyncRequestWriter(MailboxProtocol mailboxProtocol) { public MailboxSyncRequestWriter(MailboxProtocol mailboxProtocol) {
this.mailboxProtocol = mailboxProtocol; this.mailboxProtocol = mailboxProtocol;
} }
...@@ -25,7 +29,7 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { ...@@ -25,7 +29,7 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
} }
@Override @Override
public synchronized void sendEndOfStream() throws IOException { public void sendEndOfStream() throws IOException {
if (endOfStream) if (endOfStream)
throw new IOException("End of stream was already written"); throw new IOException("End of stream was already written");
...@@ -38,12 +42,12 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { ...@@ -38,12 +42,12 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
} }
@Override @Override
public synchronized void write(int i) { public void write(int i) {
bufferOS.write(i); bufferOS.write(i);
} }
@Override @Override
public synchronized void flush() throws IOException { public void flush() throws IOException {
if (endOfStream) if (endOfStream)
throw new IOException("End of stream already written"); throw new IOException("End of stream already written");
...@@ -56,7 +60,7 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter { ...@@ -56,7 +60,7 @@ class MailboxSyncRequestWriter extends OutputStream implements StreamWriter {
byte[] syncStream = bufferOS.toByteArray(); byte[] syncStream = bufferOS.toByteArray();
bufferOS.reset(); bufferOS.reset();
LOG.info("Flushing Buffer: " + syncStream.length);
MailboxRequestSync req = new MailboxRequestSync(syncStream); MailboxRequestSync req = new MailboxRequestSync(syncStream);
try { try {
mailboxProtocol.writeRequest(req); mailboxProtocol.writeRequest(req);
......
...@@ -71,23 +71,27 @@ class IncomingSession implements SyncSession, EventListener { ...@@ -71,23 +71,27 @@ class IncomingSession implements SyncSession, EventListener {
return; return;
} }
if (recordReader.hasAck()) { if (recordReader.hasAck()) {
LOG.info("Received ACK");
Ack a = recordReader.readAck(); Ack a = recordReader.readAck();
LOG.info("Owner ack " + a.getMessageIds().size() + LOG.info("Owner ack " + a.getMessageIds().size() +
" offer of:"); " offer of:");
for (MessageId id : a.getMessageIds()) LOG.info("\n" + id); for (MessageId id : a.getMessageIds()) LOG.info("\n" + id);
dbExecutor.execute(new ReceiveAck(a)); dbExecutor.execute(new ReceiveAck(a));
} else if (recordReader.hasMessage()) { } else if (recordReader.hasMessage()) {
LOG.info("Received Message");
Message m = recordReader.readMessage(); Message m = recordReader.readMessage();
LOG.info("Owner message received:" + m.getId() + " : " + LOG.info("Owner message received:" + m.getId() + " : " +
m.getGroupId()); m.getGroupId());
dbExecutor.execute(new ReceiveMessage(m)); dbExecutor.execute(new ReceiveMessage(m));
} else if (recordReader.hasOffer()) { } else if (recordReader.hasOffer()) {
LOG.info("Received Offer");
Offer o = recordReader.readOffer(); Offer o = recordReader.readOffer();
LOG.info("Owner received " + o.getMessageIds().size() + LOG.info("Owner received " + o.getMessageIds().size() +
" offer of:"); " offer of:");
for (MessageId id : o.getMessageIds()) LOG.info("\n" + id); for (MessageId id : o.getMessageIds()) LOG.info("\n" + id);
dbExecutor.execute(new ReceiveOffer(o)); dbExecutor.execute(new ReceiveOffer(o));
} else if (recordReader.hasRequest()) { } else if (recordReader.hasRequest()) {
LOG.info("Received Request");
Request r = recordReader.readRequest(); Request r = recordReader.readRequest();
LOG.info("Owner received " + r.getMessageIds().size() + LOG.info("Owner received " + r.getMessageIds().size() +
" request:"); " request:");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment