Skip to content

Commit

Permalink
Add (Json) logging.
Browse files Browse the repository at this point in the history
Notice that logging happes at two points in time when the system is in a
consistent state; a) when a message is received^1 and b) when a message
is sent.

1 A message is conceptually received, not when the receiver thread accepts
  the message from the ServerSocket, but when the message is taken out
  of the inbox.  While it might be tempting to log in the receiver thread,
  it would break the happen-before relationship of some log statements.
  For example, it could appear as if a node first received a payload
  message that activated the node, and it then passes the token (which
  a node is only allowed to do when it is inactive).
  • Loading branch information
lemmy committed Apr 10, 2023
1 parent 2e24bf9 commit 69f2708
Showing 1 changed file with 60 additions and 16 deletions.
76 changes: 60 additions & 16 deletions specifications/ewd998/impl/src/EWD998.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@
import com.google.gson.JsonPrimitive;

public class EWD998 {
private static final String PKT = "pkt";
private static final String SND = "snd";
private static final String RCV = "rcv";

private static final String MSG = "msg";

private static final String TYPE = "type";
private static final JsonPrimitive TRM = new JsonPrimitive("trm");
private static final JsonPrimitive TOK = new JsonPrimitive("tok");
private static final JsonPrimitive PL = new JsonPrimitive("pl");

private static final String EVENT = "event";
private static final JsonPrimitive IN = new JsonPrimitive("<");
private static final JsonPrimitive OUT = new JsonPrimitive(">");

private enum Color {
black,
Expand All @@ -31,6 +45,11 @@ private enum Color {
public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isInitiator) throws Exception {
this.nodes = nodes;

// The inbox contains a (json) packet (pkt) with three fields:
// "snd": the sender's id of the packet.
// "rcv": the receivers's id of the packet.
// "msg": the message, which is either a token ("type" = "tok"), payload ("pl"), or termination ("trm")
// message (see getTok, getPayload, getTrm methods below).
final BlockingQueue<JsonObject> inbox = new LinkedBlockingQueue<>();

/*
Expand All @@ -47,8 +66,18 @@ public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isIn
Color color = Color.white;
int counter = 0;
if (isInitiator) {

// The initiator prints the global number of nodes N.
JsonObject logline = new JsonObject();
logline.add("N", new JsonPrimitive(this.nodes.size()));
System.out.println(logline);

// /\ token = [pos |-> 0, q |-> 0, color |-> "black"]
inbox.put(getTok(0, Color.black));
final JsonObject pkt = new JsonObject();
pkt.add(SND, new JsonPrimitive(myId));
pkt.add(RCV, new JsonPrimitive(myId));
pkt.add(MSG, getTok(0, Color.black));
inbox.put(pkt);
}

boolean terminationDetected = false;
Expand All @@ -66,10 +95,10 @@ public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isIn
final DataInputStream dataInputStream = new DataInputStream(inputStream);
final String in = dataInputStream.readUTF();

final JsonObject msg = JsonParser.parseString(in).getAsJsonObject();
final JsonObject pkt = JsonParser.parseString(in).getAsJsonObject();

inbox.add(msg);
if (msg.get("type").getAsString().equals("trm")) {
inbox.add(pkt);
if (pkt.get(MSG).getAsJsonObject().get(TYPE).equals(TRM)) {
// See note at marker "aklseflha" below.
dataInputStream.close();
inputStream.close();
Expand All @@ -91,16 +120,24 @@ public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isIn

// --------------------------------------------------------------------------------- //
while (true) {
final JsonObject msg = inbox.take();
System.out.println(msg);
final JsonObject pkt = inbox.take();

// A log line is a json object with an "event" and a "pkt" field. The
// event shows is this is an incoming ("<") or outgoing (">") packet.
final JsonObject logline = new JsonObject();
logline.add(EVENT, IN);
logline.add(PKT, pkt);
System.out.println(logline);

final JsonObject msg = pkt.get(MSG).getAsJsonObject();

int tokenQ = 0;
Color tokenColor = null;

// --------------------------------------------------------------------------------- //

// InitiateToken and PassToken
if (msg.get("type").getAsString().equals("tok")) {
if (msg.get(TYPE).equals(TOK)) {
tokenQ = msg.get("q").getAsInt();
tokenColor = Color.valueOf(msg.get("color").getAsString());

Expand All @@ -118,7 +155,7 @@ public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isIn
terminationDetected = tokenQ + counter == 0 && color == Color.white && tokenColor == Color.white
&& !active;
}
} else if (msg.get("type").getAsString().equals("pl")) {
} else if (msg.get(TYPE).equals(PL)) {
/*
RecvMsg(i) ==
/\ pending[i] > 0
Expand All @@ -131,7 +168,7 @@ public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isIn
active = true;
counter--;
color = Color.black;
} else if (msg.get("type").getAsString().equals("trm")) {
} else if (msg.get(TYPE).equals(TRM)) {
// (aklseflha) The termination message "[trm]" is *not* part of EWD998. Here,
// the initiator sends a trm message to all nodes including itself after
// detecting termination. A recipient of a trm message closes the receiver's server
Expand Down Expand Up @@ -237,35 +274,41 @@ public EWD998(final Map<Integer, Pair> nodes, final int myId, final boolean isIn
/\ ...
/\ UNCHANGED <<token>>
*/
inbox.add(msg);
inbox.add(pkt);
}
}
}
}

private JsonObject getPayload() throws Exception {
final JsonObject result = new JsonObject();
result.add("type", new JsonPrimitive("pl"));
result.add(TYPE, PL);
return result;
}

private JsonObject getTok(final int q, final Color color) throws Exception {
final JsonObject result = new JsonObject();
result.add("type", new JsonPrimitive("tok"));
result.add(TYPE, TOK);
result.add("q", new JsonPrimitive(q));
result.add("color", new JsonPrimitive(color.toString()));
return result;
}

private JsonObject getTrm() throws Exception {
final JsonObject result = new JsonObject();
result.add("type", new JsonPrimitive("trm"));
result.add(TYPE, TRM);
return result;
}

// Boilerplate: Sending messages.
private void sendMsg(final int sender, final int receiver, final JsonObject msg) throws Exception {
System.out.println(msg);
final JsonObject pkt = new JsonObject();
pkt.add(SND, new JsonPrimitive(sender));
pkt.add(RCV, new JsonPrimitive(receiver));
pkt.add(MSG, msg);
final JsonObject logline = new JsonObject();
logline.add(EVENT, OUT);
logline.add(PKT, pkt);

final Pair p = nodes.get(receiver);
int retry = 1;
Expand All @@ -275,12 +318,13 @@ private void sendMsg(final int sender, final int receiver, final JsonObject msg)
socket.connect(new InetSocketAddress(p.host, p.port), 1000 * retry++);
final OutputStream outputStream = socket.getOutputStream();
final DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
dataOutputStream.writeUTF(msg.toString());

dataOutputStream.writeUTF(pkt.toString());
dataOutputStream.flush();
dataOutputStream.close();

socket.close();
System.out.println(logline);
return;
} catch (SocketTimeoutException | ConnectException thisIsFineWillRetry) {
if (retry > 3) {
Expand Down

0 comments on commit 69f2708

Please sign in to comment.