diff --git a/src/SiriClient.java b/src/SiriClient.java new file mode 100644 index 0000000..366cf1d --- /dev/null +++ b/src/SiriClient.java @@ -0,0 +1,365 @@ +import java.io.*; +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.net.Socket; +import java.net.SocketException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.Collections; + +public class SiriClient { + + private String serverMessage; + + /* Client parameters */ + public final String SERVERIP; //your computer IP address + public final int SERVERPORT; + public final int RUN_TIME; // In seconds + public final int QUERY_SIZE; + public final int RESPONSE_SIZE; + public final int DELAY_QUERY_RESPONSE; + public final int MIN_PAYLOAD_SIZE; + public final int MAX_PAYLOAD_SIZE; + public final int INTERVAL_TIME_MS; + public final int BUFFER_SIZE; + + private boolean mRun = false; + private int messageId = 0; + private static final int MAX_ID = 100; + + private Random random; + private long[] sentTime; + private List delayTime; + private String mac; + private int counter; + private int missed; + PrintWriter out; + BufferedReader in; + OutputStream outputStream; + OutputStreamWriter osw; + Socket socket; + + /** + * Constructor of the class. OnMessagedReceived listens for the messages received from server + */ + public SiriClient(String serverIp, int serverPort, int runTime, int querySize, int responseSize, + int delayQueryResponse, int minPayloadSize, int maxPayloadSize, int intervalTimeMs, + int bufferSize) { + random = new Random(); + sentTime = new long[MAX_ID]; + delayTime = new ArrayList<>(); + counter = 0; + missed = 0; + /* Client parameters */ + SERVERIP = serverIp; + SERVERPORT = serverPort; + RUN_TIME = runTime; + QUERY_SIZE = querySize; + RESPONSE_SIZE = responseSize; + DELAY_QUERY_RESPONSE = delayQueryResponse; + MIN_PAYLOAD_SIZE = minPayloadSize; + MAX_PAYLOAD_SIZE = maxPayloadSize; + INTERVAL_TIME_MS = intervalTimeMs; + BUFFER_SIZE = bufferSize; + } + + public SiriClient(String serverIp, int serverPort, int runTime) { + this(serverIp, serverPort, runTime, 2500, 750, 0, 85, 500, 333, 9); + } + + protected String getStringWithLengthAndFilledWithCharacter(int length, char charToFill) { + if (length > 0) { + char[] array = new char[length]; + Arrays.fill(array, charToFill); + return new String(array); + } + return ""; + } + + /** + * Return a random number in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE] + * It manages 3 cases: + * 1) remainToSend in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE]: return remainToSend + * 2) remainToSend - @MAX_PAYLOAD_SIZE < MIN_PAYLOAD_SIZE: return random value in + * [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE - @MIN_PAYLOAD_SIZE] + * 3) else, return random value in [@MIN_PAYLOAD_SIZE, @MAX_PAYLOAD_SIZE] + * @param remainToSend number of remaining bytes to send >= MIN_PAYLOAD_SIZE + */ + private int sizeOfNextPacket(int remainToSend) { + if (remainToSend < MIN_PAYLOAD_SIZE) throw new AssertionError(); + + // Case 1) + if (remainToSend <= MAX_PAYLOAD_SIZE && remainToSend >= MIN_PAYLOAD_SIZE) { + return remainToSend; + } + + int randomPart; + + // Case 2) + if (remainToSend - MAX_PAYLOAD_SIZE < MIN_PAYLOAD_SIZE) { + randomPart = random.nextInt(MAX_PAYLOAD_SIZE - 2 * MIN_PAYLOAD_SIZE + 1); + } + // Case 3) + else { + randomPart = random.nextInt(MAX_PAYLOAD_SIZE - MIN_PAYLOAD_SIZE + 1); + } + + return MIN_PAYLOAD_SIZE + randomPart; + } + + /** + * Get a random value following a Poisson distribution of mean lambda + * @param lambda mean of the Poisson distribution + * @return random value following a Poisson distribution of mean lambda + */ + public static int getPoisson(double lambda) { + double L = Math.exp(-lambda); + double p = 1.0; + int k = 0; + + do { + k++; + p *= Math.random(); + } while (p > L); + + return k - 1; + } + + /** + * Sends the message entered by client to the server + */ + public void sendMessage() { + // if (out != null && !out.checkError() && osw != null) { + if (socket != null && !socket.isClosed()) { + int remainToBeSent = QUERY_SIZE; + // If the server has a DB, use it, otherwise set to 0 + //int delaysToSend = delayTime.size(); + int delaysToSend = 0; + StringBuffer sb = new StringBuffer(); +// for (int i = 0; i < delaysToSend; i++) { +// sb.append(delayTime.get(0) + "&"); +// delayTime.remove(delayTime.get(0)); +// } + sentTime[messageId] = System.currentTimeMillis(); + String startString = messageId + "&" + QUERY_SIZE + "&" + RESPONSE_SIZE + "&" + + DELAY_QUERY_RESPONSE + "&" + delaysToSend + "&" + sentTime[messageId] + + "&" + mac + "&" + sb.toString(); + messageId = (messageId + 1) % MAX_ID; + int bytesToSend = Math.max(startString.length(), sizeOfNextPacket(remainToBeSent)); + // System.err.println("BytesToSend: " + bytesToSend); + byte[] toSend; + try { + //osw.write(startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length(), '0')); + //osw.flush(); + toSend = (startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length(), '0')).getBytes(StandardCharsets.US_ASCII); + outputStream.write(toSend); + outputStream.flush(); + } catch (IOException e) { + System.err.println("ERROR: OUTPUT STREAM ERROR"); + } + // out.println(startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length() - 1, '0')); + // System.err.println("Sent " + startString + getStringWithLengthAndFilledWithCharacter(bytesToSend - startString.length() - 1, '0')); + // out.flush(); + remainToBeSent -= bytesToSend; + synchronized (this) { + counter += bytesToSend; + } + + try { + while(remainToBeSent > 0) { + bytesToSend = sizeOfNextPacket(remainToBeSent); + //out.println(getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0')); + //out.flush(); + + if (remainToBeSent == bytesToSend) { + //osw.write(getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0') + "\n"); + toSend = (getStringWithLengthAndFilledWithCharacter(bytesToSend - 1, '0') + "\n").getBytes(StandardCharsets.US_ASCII); + } else { + //osw.write(getStringWithLengthAndFilledWithCharacter(bytesToSend, '0')); + toSend = getStringWithLengthAndFilledWithCharacter(bytesToSend, '0').getBytes(StandardCharsets.US_ASCII); + } + //osw.flush(); + outputStream.write(toSend); + outputStream.flush(); + + remainToBeSent -= bytesToSend; + synchronized (this) { + counter += bytesToSend; + } + } + } catch (IOException e) { + System.err.println("ERROR: OUTPUT STREAM ERROR"); + e.printStackTrace(); + } + } + } + + public void stopClient(){ + mRun = false; + if (socket != null) { + try { + socket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + public static String getIPAddress(boolean useIPv4) { + try { + List interfaces = Collections.list(NetworkInterface.getNetworkInterfaces()); + for (NetworkInterface intf : interfaces) { + List addrs = Collections.list(intf.getInetAddresses()); + for (InetAddress addr : addrs) { + if (!addr.isLoopbackAddress()) { + String sAddr = addr.getHostAddress(); + //boolean isIPv4 = InetAddressUtils.isIPv4Address(sAddr); + boolean isIPv4 = sAddr.indexOf(':')<0; + + if (useIPv4) { + if (isIPv4) + return sAddr; + } else { + if (!isIPv4) { + int delim = sAddr.indexOf('%'); // drop ip6 zone suffix + return delim<0 ? sAddr.toUpperCase() : sAddr.substring(0, delim).toUpperCase(); + } + } + } + } + } + } catch (Exception ex) { } // for now eat exceptions + return ""; + } + + public void run(String macWifi) { + + mRun = true; + mac = macWifi; + + try { + //here you must put your computer's IP address. + InetAddress serverAddr = InetAddress.getByName(SERVERIP); + System.err.println("Me: " + getIPAddress(true)); + + System.err.println("TCP Client: Connecting..."); + + //create a socket to make the connection with the server + socket = new Socket(serverAddr, SERVERPORT); + // Needed to emulate any traffic + socket.setTcpNoDelay(true); + + new Thread(new Runnable() { + public void run() { + final long startTime = System.currentTimeMillis(); + while (socket != null && !socket.isClosed()) { + try { + Thread.sleep(INTERVAL_TIME_MS); //* getPoisson(3)); + if ((System.currentTimeMillis() - startTime) >= RUN_TIME * 1000) { + stopClient(); + } else if (!socket.isClosed() && counter <= QUERY_SIZE * BUFFER_SIZE) { + sendMessage(); + } else if (!socket.isClosed()) { + missed++; + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + + } + }).start(); + + + try { + + //send the message to the server + out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true); + outputStream = socket.getOutputStream(); + osw = new OutputStreamWriter(socket.getOutputStream()); + + System.err.println("TCP Client: Done."); + + //receive the message which the server sends back + in = new BufferedReader(new InputStreamReader(socket.getInputStream())); + + //in this while the client listens for the messages sent by the server + while (mRun) { + serverMessage = in.readLine(); + long receivedTime = System.currentTimeMillis(); + // System.err.println("SERVER MESSAGE: " + ((serverMessage == null) ? "" : serverMessage)); + if (serverMessage != null) { + int id = Integer.parseInt(serverMessage.split("&")[0]); + // System.err.println("ELAPSED TIME: " + (receivedTime - sentTime[id])); + delayTime.add(receivedTime - sentTime[id]); + synchronized (this) { + counter -= QUERY_SIZE; + } + + } + serverMessage = null; + + } + + //System.err.println("RESPONSE FROM SERVER: Received Message: '" + serverMessage + "'"); + + } catch (SocketException e) { + System.err.println("TCP: Socket closed"); + } catch (Exception e) { + System.err.println("TCP: Error " + e); + + } finally { + //the socket must be closed. It is not possible to reconnect to this socket + // after it is closed, which means a new socket instance has to be created. + socket.close(); + } + + } catch (Exception e) { + + System.err.println("TCP C: Error" + e); + + } + + } + + public static void usage() { + System.out.println("Usage: siriClient serverIP serverPort runTime [querySize responseSize delayQueryResponse " + + "minPayloadSize maxPayloadSize intervalTimeMs bufferSize]"); + } + + public static void main(String[] args) { + if (args.length != 3 && args.length != 10) { + usage(); + System.exit(1); + } + String serverIp = args[0]; + int serverPort = Integer.parseInt(args[1]); + int runTime = Integer.parseInt(args[2]); + SiriClient siriClient; + + if (args.length == 10) { + int querySize = Integer.parseInt(args[3]); + int responseSize = Integer.parseInt(args[4]); + int delayQueryResponse = Integer.parseInt(args[5]); + int minPayloadSize = Integer.parseInt(args[6]); + int maxPayloadSize = Integer.parseInt(args[7]); + int intervalTimeMs = Integer.parseInt(args[8]); + int bufferSize = Integer.parseInt(args[9]); + siriClient = new SiriClient(serverIp, serverPort, runTime, querySize, responseSize, delayQueryResponse, + minPayloadSize, maxPayloadSize, intervalTimeMs, bufferSize); + } else { + siriClient = new SiriClient(serverIp, serverPort, runTime); + } + + String mac = "00:00:00:00:00:00"; + siriClient.run(mac); + System.out.println("missed: " + siriClient.missed); + for (long delay: siriClient.delayTime) { + System.out.println(delay); + } + } +} \ No newline at end of file diff --git a/src/mpExperienceSiri.py b/src/mpExperienceSiri.py new file mode 100644 index 0000000..e12acbb --- /dev/null +++ b/src/mpExperienceSiri.py @@ -0,0 +1,81 @@ +from mpExperience import MpExperience +from mpParamXp import MpParamXp +from mpPvAt import MpPvAt +import os + +class MpExperienceSiri(MpExperience): + SERVER_LOG = "siri_server.log" + CLIENT_LOG = "siri_client.log" + JAVA_BIN = "java" + PING_OUTPUT = "ping.log" + + def __init__(self, xpParamFile, mpTopo, mpConfig): + MpExperience.__init__(self, xpParamFile, mpTopo, mpConfig) + self.loadParam() + self.ping() + MpExperience.classicRun(self) + + def ping(self): + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiri.PING_OUTPUT ) + count = self.xpParam.getParam(MpParamXp.PINGCOUNT) + for i in range(0, self.mpConfig.getClientInterfaceCount()): + cmd = self.pingCommand(self.mpConfig.getClientIP(i), + self.mpConfig.getServerIP(), n = count) + self.mpTopo.commandTo(self.mpConfig.client, cmd) + + def pingCommand(self, fromIP, toIP, n=5): + s = "ping -c " + str(n) + " -I " + fromIP + " " + toIP + \ + " >> " + MpExperienceSiri.PING_OUTPUT + print(s) + return s + + def loadParam(self): + """ + todo : param LD_PRELOAD ?? + """ + self.run_time = self.xpParam.getParam(MpParamXp.SIRIRUNTIME) + self.query_size = self.xpParam.getParam(MpParamXp.SIRIQUERYSIZE) + self.response_size = self.xpParam.getParam(MpParamXp.SIRIRESPONSESIZE) + self.delay_query_response = self.xpParam.getParam(MpParamXp.SIRIDELAYQUERYRESPONSE) + self.min_payload_size = self.xpParam.getParam(MpParamXp.SIRIMINPAYLOADSIZE) + self.max_payload_size = self.xpParam.getParam(MpParamXp.SIRIMAXPAYLOADSIZE) + self.interval_time_ms = self.xpParam.getParam(MpParamXp.SIRIINTERVALTIMEMS) + self.buffer_size = self.xpParam.getParam(MpParamXp.SIRIBUFFERSIZE) + # Little trick here + self.xpParam.defaultValue[MpParamXp.AUTOCORK] = "0" + + def prepare(self): + MpExperience.prepare(self) + self.mpTopo.commandTo(self.mpConfig.client, "rm " + \ + MpExperienceSiri.CLIENT_LOG) + self.mpTopo.commandTo(self.mpConfig.server, "rm " + \ + MpExperienceSiri.SERVER_LOG) + + def getSiriServerCmd(self): + s = "python " + os.path.dirname(os.path.abspath(__file__)) + \ + "/siri_server.py &>" + MpExperienceSiri.SERVER_LOG + "&" + print(s) + return s + + def getSiriClientCmd(self): + s = MpExperienceSiri.JAVA_BIN + " -jar siriClient.jar" + self.mpConfig.getServerIP() + \ + " 8080 " + self.run_time + " " + self.query_size + " " + self.response_size + \ + " " + self.delay_query_response + " " + self.min_payload_size + " " + \ + self.max_payload_size + " " + self.interval_time_ms + " " + self.buffer_size + \ + " &>" + MpExperienceSiri.CLIENT_LOG + print(s) + return s + + def clean(self): + MpExperience.clean(self) + + + def run(self): + cmd = self.getSiriServerCmd() + self.mpTopo.commandTo(self.mpConfig.server, cmd) + + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") + cmd = self.getSiriClientCmd() + self.mpTopo.commandTo(self.mpConfig.client, cmd) + self.mpTopo.commandTo(self.mpConfig.client, "sleep 2") diff --git a/src/mpParamXp.py b/src/mpParamXp.py index a20c47f..6bf2804 100644 --- a/src/mpParamXp.py +++ b/src/mpParamXp.py @@ -6,6 +6,7 @@ class MpParamXp(MpParam): WMEM = "wmem" SCHED = "sched" CC = "congctrl" + AUTOCORK = "autocork" KERNELPM = "kpm" KERNELPMC = "kpmc" #kernel path manager client / server KERNELPMS = "kpms" @@ -38,6 +39,14 @@ class MpParamXp(MpParam): NETPERFREQRESSIZE = "netperfReqresSize" ABCONCURRENTREQUESTS = "abConccurentRequests" ABTIMELIMIT = "abTimelimit" + SIRIRUNTIME = "siriRunTime" + SIRIQUERYSIZE = "siriQuerySize" + SIRIRESPONSESIZE = "siriResponseSize" + SIRIDELAYQUERYRESPONSE = "siriDelayQueryResponse" + SIRIMINPAYLOADSIZE = "siriMinPayloadSize" + SIRIMAXPAYLOADSIZE = "siriMaxPayloadSize" + SIRIINTERVALTIMEMS = "siriIntervalTimeMs" + SIRIBUFFERSIZE = "siriBufferSize" # global sysctl @@ -47,6 +56,7 @@ class MpParamXp(MpParam): sysctlKey[KERNELPM] = "net.mptcp.mptcp_path_manager" sysctlKey[SCHED] = "net.mptcp.mptcp_scheduler" sysctlKey[CC] = "net.ipv4.tcp_congestion_control" + sysctlKey[AUTOCORK] = "net.ipv4.tcp_autocorking" sysctlKeyClient = {} sysctlKeyClient[KERNELPMC] = "net.mptcp.mptcp_path_manager" @@ -66,6 +76,7 @@ class MpParamXp(MpParam): defaultValue[USERPMSARGS] = "" defaultValue[CC] = "olia" defaultValue[SCHED] = "default" + defaultValue[AUTOCORK] = "1" defaultValue[CLIENTPCAP] = "no" defaultValue[SERVERPCAP] = "no" @@ -91,6 +102,13 @@ class MpParamXp(MpParam): defaultValue[NETPERFREQRESSIZE] = "2K,256" defaultValue[ABCONCURRENTREQUESTS] = "50" defaultValue[ABTIMELIMIT] = "20" + defaultValue[SIRIQUERYSIZE] = "2500" + defaultValue[SIRIRESPONSESIZE] = "750" + defaultValue[SIRIDELAYQUERYRESPONSE] = "0" + defaultValue[SIRIMINPAYLOADSIZE] = "85" + defaultValue[SIRIMAXPAYLOADSIZE] = "500" + defaultValue[SIRIINTERVALTIMEMS] = "333" + defaultValue[SIRIBUFFERSIZE] = "9" def __init__(self, paramFile): MpParam.__init__(self, paramFile) diff --git a/src/siriClient.jar b/src/siriClient.jar new file mode 100644 index 0000000..6c8009b Binary files /dev/null and b/src/siriClient.jar differ diff --git a/src/siri_server.py b/src/siri_server.py new file mode 100644 index 0000000..b17556a --- /dev/null +++ b/src/siri_server.py @@ -0,0 +1,153 @@ +import socket +import sys +import threading +import time + +BUFFER_SIZE = 512 +ENCODING = 'ascii' + +MAX_SECURITY_HEADER_SIZE = 128 +MIN_FIELDS_NB = 8 + +threads = {} +to_join = [] +delay_results = {} +time_sent = {} +mac = {} + + +class HandleClientConnectionThread(threading.Thread): + """ Handle requests given by the client """ + + def __init__(self, connection, client_address, id): + threading.Thread.__init__(self) + self.connection = connection + self.client_address = client_address + self.id = id + + def run(self): + try: + print(self.id, ": Connection from", self.client_address) + + # Some useful variables + first_data = True + expected_req_size = 0 + res_size = 0 + waiting_time = 0 + expected_delay_results = 0 + buffer_data = "" + next_buffer_data = "" + message_sizes = [] + message_id = 0 + + # Initialize logging variables + delay_results[self.id] = [] + time_sent[self.id] = [] + + # Receive the data and retransmit it + while True: + data = self.connection.recv(BUFFER_SIZE).decode(ENCODING) + + if len(data) == 0: + # Connection closed at remote; close the connection + break + + # Manage the case where the client is very very quick + if not data.endswith("\n") and "\n" in data: + carriage_index = data.index("\n") + buffer_data += data[:carriage_index + 1] + next_buffer_data += data[carriage_index + 1:] + used_data = data[:carriage_index + 1] + else: + buffer_data += data + used_data = data + + message_sizes.append(len(used_data)) + + if first_data and len(buffer_data) <= MAX_SECURITY_HEADER_SIZE and len(buffer_data.split("&")) < MIN_FIELDS_NB: + continue + + if first_data: + split_data = buffer_data.split("&") + expected_delay_results = int(split_data[4]) + if len(split_data) - MIN_FIELDS_NB == expected_delay_results: + message_id = int(split_data[0]) + expected_req_size = int(split_data[1]) + res_size = int(split_data[2]) + waiting_time = int(split_data[3]) + time_sent[self.id].append(int(split_data[5])) + + # Little check, we never know + if self.id not in mac: + mac[self.id] = split_data[6] + elif not mac[self.id] == split_data[6]: + print(self.id, ": MAC changed from", mac[self.id], "to", split_data[6], ": close the connection", file=sys.stderr) + break + + for i in range(expected_delay_results): + delay_results[self.id].append(int(split_data[7 + i])) + first_data = False + if len(delay_results[self.id]) % 10 == 0: + print(self.id, ":", time_sent[self.id]) + print(self.id, ":", delay_results[self.id]) + else: + # Avoid further processing, wait for additional packets + continue + + if len(buffer_data) == expected_req_size: + print(self.id, ": Received request of size", expected_req_size, "; send response of", res_size, "after", waiting_time, "s") + time.sleep(waiting_time) + self.connection.sendall((str(message_id) + "&" + "0" * (res_size - len(str(message_id)) - 2) + "\n").encode(ENCODING)) + first_data = True + buffer_data = "" + message_sizes = [] + if len(next_buffer_data) > 0: + buffer_data = next_buffer_data + next_buffer_data = "" + message_sizes.append(len(buffer_data)) + + elif len(buffer_data) > expected_req_size: + print(len(buffer_data), len(data), len(used_data), file=sys.stderr) + print(self.id, ": Expected request of size", expected_req_size, "but received request of size", len(buffer_data), "; close the connection", file=sys.stderr) + print(buffer_data, file=sys.stderr) + break + + finally: + # Clean up the connection + print(self.id, ": Closing connection") + self.connection.close() + to_join.append(self.id) + + +def join_finished_threads(): + """ Join threads whose connection is closed """ + while len(to_join) > 0: + thread_to_join_id = to_join.pop() + threads[thread_to_join_id].join() + del threads[thread_to_join_id] + +# Create a TCP/IP socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# Handle reusing the same 5-tuple if the previous one is still in TIME_WAIT +sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + +# Bind the socket to the port +server_address = ('0.0.0.0', 8080) +print("Stating the server on %s port %s" % server_address) +sock.bind(server_address) + +# Listen for incoming connections +sock.listen(4) + +# Connection identifier +conn_id = 0 + +while True: + # Wait for a connection + connection, client_address = sock.accept() + thread = HandleClientConnectionThread(connection, client_address, conn_id) + threads[conn_id] = thread + conn_id += 1 + thread.start() + join_finished_threads()